Print this page
OS-1566 dataset quota for ZFS datasets

@@ -19,10 +19,11 @@
  * CDDL HEADER END
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2012 Joyent, Inc. All rights reserved.
  */
 
 #include <sys/dmu.h>
 #include <sys/dmu_objset.h>
 #include <sys/dmu_tx.h>

@@ -36,16 +37,77 @@
 #include <sys/zap.h>
 #include <sys/zio.h>
 #include <sys/arc.h>
 #include <sys/sunddi.h>
 #include <sys/zfs_zone.h>
+#include <sys/zfeature.h>
 #include "zfs_namecheck.h"
 
+/*
+ * Dataset and Snapshot Quotas
+ * ---------------------------
+ *
+ * These quotas are used to limit the number of datasets and/or snapshots
+ * that can be created at a given level in the tree or below. A common use-case
+ * is with a delegated dataset where the administrator wants to ensure that
+ * a user within the zone is not creating too many datasets or snapshots, even
+ * though they're not exceeding their space quota.
+ *
+ * The count of datasets and snapshots is stored in the dsl_dir_phys_t which
+ * impacts the on-disk format. As such, this capability is controlled by a
+ * feature flag and must be enabled to be used. Once enabled, the feature is
+ * not active until the first quota is set. At that point, future operations to
+ * create/destroy datasets or snapshots will validate and update the counts.
+ *
+ * Because the on-disk counts will be incorrect (garbage) before the feature is
+ * active, the counts are updated when the quota is first set. Starting at the
+ * dataset with the new quota, the code descends into all sub-datasets and
+ * updates the counts to be accurate. In practice this is lightweight since
+ * a quota is typically set when the dataset is created and thus has no
+ * children. Once set, changing the quota value won't require a traversal since
+ * the counts are already valid. The counts in datasets above the one with the
+ * new quota will still be incorrect, unless a quota is eventually set on one
+ * of those datasets. If a dataset with a quota is encountered during the
+ * descent, the counts are known to be valid and there is no need to descend
+ * into that dataset's children. When a new quota value is set on a dataset
+ * with an existing quota, the new value must not be less than the current
+ * count at that level or an error is returned and the quota is not changed.
+ *
+ * Once the feature is active, then whenever a dataset or snapshot is created,
+ * the code recurses up the tree, validating the new count against the quota
+ * at each level. In practice, most levels will not have a quota set. If there
+ * is a quota at any level up the tree, the check must pass or the creation
+ * will fail. Likewise, when a dataset or snapshot is destroyed, the counts
+ * are recursively adjusted all the way up the tree. Renaming a dataset into
+ * different point in the tree will first validate, then update the counts on
+ * each branch up to the common ancestor. A receive will also validate the
+ * counts and then update them.
+ *
+ * Recursive snapshots behave a bit differently. The quota is only validated
+ * against the top-level dataset at which the snapshot is being taken. This
+ * is to prevent a denial-of-service in which a lower level dataset could
+ * max out its quota and thus block snapshots from being taken at a higher
+ * level (in addition, the complexity to address this is not worth the cost).
+ * Because of this, it is possible for the snapshot count to be over the quota
+ * and snapshots taken at a high level could cause a lower level dataset to hit
+ * or exceed its quota. The administrator taking the high-level recursive
+ * snapshot should be aware of this side-effect and behave accordingly.
+ *
+ * The dataset quota is validated by dsl_dir_dscount_check() and updated by
+ * dsl_dir_dscount_adjust(). The snapshot quota is validated by
+ * dsl_snapcount_check() and updated by dsl_snapcount_adjust().
+ * A new quota value is validated in dsl_dir_validate_ds_ss_quota() and the
+ * dataset counts are adjusted, if necessary, by dsl_dir_set_ds_ss_count().
+ */
+
 static uint64_t dsl_dir_space_towrite(dsl_dir_t *dd);
 static void dsl_dir_set_reservation_sync_impl(dsl_dir_t *dd,
     uint64_t value, dmu_tx_t *tx);
 
+extern dsl_syncfunc_t dsl_prop_set_sync;
+extern char *tmp_dmu_recv_tag;
+
 /* ARGSUSED */
 static void
 dsl_dir_evict(dmu_buf_t *db, void *arg)
 {
         dsl_dir_t *dd = arg;

@@ -405,10 +467,319 @@
 dsl_dir_open(const char *name, void *tag, dsl_dir_t **ddp, const char **tailp)
 {
         return (dsl_dir_open_spa(NULL, name, tag, ddp, tailp));
 }
 
+/*
+ * Check if there is already a dataset/snapshot quota set for the dataset. If
+ * not, then the counts on this dataset, and those below, may be incorrect due
+ * to the use of a pre-existing pool which did not support the dataset/snapshot
+ * quota feature.
+ *
+ * Recursively descend the dataset tree and update the dataset/snapshot counts
+ * on each dataset below, then update the cumulative count on the current
+ * dataset. If the dataset already has a quota set on it, then we know that
+ * its counts, and the counts on the datasets below it, have been updated to
+ * be correct, so we can skip that dataset.
+ */
+static void
+dsl_dir_set_ds_ss_count(const char *nm, dsl_dir_t *dd, dmu_tx_t *tx,
+    uint64_t *dscnt, uint64_t *sscnt)
+{
+        uint64_t my_ds_cnt = 0;
+        uint64_t my_ss_cnt = 0;
+        objset_t *os = dd->dd_pool->dp_meta_objset;
+        zap_cursor_t *zc;
+        zap_attribute_t *za;
+        char *namebuf;
+        int err;
+        boolean_t quota_set = B_FALSE;
+        uint64_t dsquota, ssquota;
+        dsl_dataset_t *ds;
+
+        err = dsl_prop_get_dd(dd, zfs_prop_to_name(ZFS_PROP_DATASET_QUOTA),
+            8, 1, &dsquota, NULL, B_FALSE);
+        if (err == 0 && dsquota != 0)
+                quota_set = B_TRUE;
+
+        if (!quota_set) {
+                err = dsl_prop_get_dd(dd,
+                    zfs_prop_to_name(ZFS_PROP_SNAPSHOT_QUOTA), 8, 1, &ssquota,
+                    NULL, B_FALSE);
+                if (err == 0 && ssquota != 0)
+                        quota_set = B_TRUE;
+        }
+
+        /*
+         * If the dd has a quota, we know its count is already good and we
+         * don't need to recurse down any further.
+         */
+        if (quota_set) {
+                /* Return dataset count plus 1 for self */
+                *dscnt = dd->dd_phys->dd_dataset_count + 1;
+                *sscnt = dd->dd_phys->dd_snapshot_count;
+
+                return;
+        }
+
+        zc = kmem_alloc(sizeof (zap_cursor_t), KM_SLEEP);
+        za = kmem_alloc(sizeof (zap_attribute_t), KM_SLEEP);
+        namebuf = kmem_alloc(MAXPATHLEN, KM_SLEEP);
+
+        mutex_enter(&dd->dd_lock);
+
+        /* Iterate datasets */
+        for (zap_cursor_init(zc, os, dd->dd_phys->dd_child_dir_zapobj);
+            zap_cursor_retrieve(zc, za) == 0;
+            zap_cursor_advance(zc)) {
+                dsl_dir_t *chld_dd;
+                uint64_t chld_ds_cnt = 0;
+                uint64_t chld_ss_cnt = 0;
+
+                (void) snprintf(namebuf, MAXPATHLEN, "%s/%s", nm, za->za_name);
+
+                if (dsl_dir_open(namebuf, FTAG, &chld_dd, NULL))
+                        continue;
+
+                dsl_dir_set_ds_ss_count(namebuf, chld_dd, tx, &chld_ds_cnt,
+                    &chld_ss_cnt);
+
+                dsl_dir_close(chld_dd, FTAG);
+
+                my_ds_cnt += chld_ds_cnt;
+                my_ss_cnt += chld_ss_cnt;
+        }
+        zap_cursor_fini(zc);
+
+        kmem_free(namebuf, MAXPATHLEN);
+
+        /* Iterate snapshots */
+        if (dsl_dataset_hold(nm, FTAG, &ds) == 0) {
+                for (zap_cursor_init(zc, os, ds->ds_phys->ds_snapnames_zapobj);
+                    zap_cursor_retrieve(zc, za) == 0;
+                    zap_cursor_advance(zc)) {
+                        my_ss_cnt++;
+                }
+                zap_cursor_fini(zc);
+                dsl_dataset_rele(ds, FTAG);
+        }
+
+        kmem_free(zc, sizeof (zap_cursor_t));
+        kmem_free(za, sizeof (zap_attribute_t));
+
+#ifdef _KERNEL
+        extern void __dtrace_probe_zfs__ds__fix__count(char *, uint64_t,
+            uint64_t);
+        __dtrace_probe_zfs__ds__fix__count((char *)nm, my_ds_cnt, my_ss_cnt);
+#endif
+
+        /* save updated counts */
+        dmu_buf_will_dirty(dd->dd_dbuf, tx);
+        dd->dd_phys->dd_dataset_count = my_ds_cnt;
+        dd->dd_phys->dd_snapshot_count = my_ss_cnt;
+
+        mutex_exit(&dd->dd_lock);
+
+        /* Return child dataset count plus 1 for self */
+        *dscnt = my_ds_cnt + 1;
+        *sscnt = my_ss_cnt;
+}
+
+/*
+ * Return ENOSPC if new quota is less than the existing count, otherwise return
+ * -1 to force the zfs_set_prop_nvlist code down the default path to set the
+ * value in the nvlist.
+ */
+int
+dsl_dir_validate_ds_ss_quota(const char *ddname, uint64_t quota,
+    zfs_prop_t ptype)
+{
+        dsl_dir_t *dd;
+        dsl_dataset_t *ds;
+        int err = -1;
+        uint64_t count;
+        dmu_tx_t *tx;
+        uint64_t my_ds_cnt = 0;
+        uint64_t my_ss_cnt = 0;
+        spa_t *spa;
+        zfeature_info_t *quota_feat =
+            &spa_feature_table[SPA_FEATURE_DS_SS_QUOTA];
+
+        if (dsl_dataset_hold(ddname, FTAG, &ds))
+                return (EACCES);
+
+        spa = dsl_dataset_get_spa(ds);
+        if (!spa_feature_is_enabled(spa,
+            &spa_feature_table[SPA_FEATURE_DS_SS_QUOTA])) {
+                dsl_dataset_rele(ds, FTAG);
+                return (ENOTSUP);
+        }
+
+        /* 0 means no quota */
+        if (quota == 0) {
+                dsl_dataset_rele(ds, FTAG);
+                return (-1);
+        }
+
+        if (dsl_dir_open(ddname, FTAG, &dd, NULL)) {
+                dsl_dataset_rele(ds, FTAG);
+                return (EACCES);
+        }
+
+        ASSERT(ds->ds_dir == dd);
+
+        tx = dmu_tx_create_dd(dd);
+        if (dmu_tx_assign(tx, TXG_WAIT)) {
+                dmu_tx_abort(tx);
+                return (ENOSPC);
+        }
+
+        /* set the feature active flag now */
+        if (!spa_feature_is_active(spa, quota_feat))
+                spa_feature_incr(spa, quota_feat, tx);
+
+        /*
+         * Since we are now setting a non-0 quota on the dataset, we need to
+         * ensure the counts are correct. Descend down the tree from this
+         * point and update all of the counts to be accurate.
+         */
+        rw_enter(&dd->dd_pool->dp_config_rwlock, RW_READER);
+        dsl_dir_set_ds_ss_count(ddname, dd, tx, &my_ds_cnt, &my_ss_cnt);
+        rw_exit(&dd->dd_pool->dp_config_rwlock);
+
+        dmu_tx_commit(tx);
+
+        if (ptype == ZFS_PROP_DATASET_QUOTA)
+                count = dd->dd_phys->dd_dataset_count;
+        else
+                count = dd->dd_phys->dd_snapshot_count;
+
+        if (quota < count)
+                err = ENOSPC;
+
+        dsl_dir_close(dd, FTAG);
+        dsl_dataset_rele(ds, FTAG);
+
+        return (err);
+}
+
+/*
+ * Check if adding additional child dataset(s) would exceed any dataset
+ * quotas.  Note that all dataset quotas up to the root dataset (i.e. the pool
+ * itself) or the given ancestor must be satisfied. When receiving we don't
+ * check if the tx is syncing. In this case, the tx is passed as NULL.
+ */
+int
+dsl_dir_dscount_check(dsl_dir_t *dd, dmu_tx_t *tx, uint64_t cnt,
+    dsl_dir_t *ancestor)
+{
+        uint64_t quota;
+        int err = 0;
+
+        VERIFY(RW_LOCK_HELD(&dd->dd_pool->dp_config_rwlock));
+
+        /*
+         * As with dsl_dataset_set_reservation_check(), don't run this check in
+         * open context.
+         */
+        if (tx != NULL && !dmu_tx_is_syncing(tx))
+                return (0);
+
+        /*
+         * If an ancestor has been provided, stop checking the quota once we
+         * hit that dir. We need this during rename so that we don't overcount
+         * the check once we recurse up to the common ancestor.
+         */
+        if (ancestor == dd)
+                return (0);
+
+        /*
+         * If there's no value for this property, there's no need to enforce a
+         * dataset quota.
+         */
+        err = dsl_prop_get_dd(dd, zfs_prop_to_name(ZFS_PROP_DATASET_QUOTA),
+            8, 1, &quota, NULL, B_FALSE);
+        if (err == ENOENT)
+                return (0);
+        else if (err != 0)
+                return (err);
+
+#ifdef _KERNEL
+        extern void __dtrace_probe_zfs__ds__quota(uint64_t, uint64_t, char *);
+        __dtrace_probe_zfs__ds__quota((uint64_t)dd->dd_phys->dd_dataset_count,
+            (uint64_t)quota, dd->dd_myname);
+#endif
+
+        if (quota > 0 && (dd->dd_phys->dd_dataset_count + cnt) > quota)
+                return (EDQUOT);
+
+        if (dd->dd_parent != NULL)
+                err = dsl_dir_dscount_check(dd->dd_parent, tx, cnt, ancestor);
+
+        return (err);
+}
+
+/*
+ * Adjust the dataset count for the specified dsl_dir_t and all parent datasets.
+ * When a new dataset is created, increment the count on all parents, and when a
+ * dataset is destroyed, decrement the count.
+ */
+void
+dsl_dir_dscount_adjust(dsl_dir_t *dd, dmu_tx_t *tx, int64_t delta,
+    boolean_t syncing, boolean_t first)
+{
+        /*
+         * On initial entry we need to check if this feature is active, but
+         * we don't want to re-check this on each recursive call. Note: the
+         * feature cannot be active if its not enabled. If the feature is not
+         * active, don't touch the on-disk count fields.
+         */
+        if (first) {
+                dsl_dataset_t *ds = NULL;
+                spa_t *spa;
+                zfeature_info_t *quota_feat =
+                    &spa_feature_table[SPA_FEATURE_DS_SS_QUOTA];
+
+                VERIFY(0 == dsl_dataset_hold_obj(dd->dd_pool,
+                    dd->dd_phys->dd_head_dataset_obj, FTAG, &ds));
+                spa = dsl_dataset_get_spa(ds);
+                dsl_dataset_rele(ds, FTAG);
+                if (!spa_feature_is_active(spa, quota_feat))
+                        return;
+        }
+
+        VERIFY(RW_LOCK_HELD(&dd->dd_pool->dp_config_rwlock));
+        if (syncing)
+                VERIFY(dmu_tx_is_syncing(tx));
+
+        dmu_buf_will_dirty(dd->dd_dbuf, tx);
+
+        mutex_enter(&dd->dd_lock);
+
+        /*
+         * Counts may be incorrect if dealing with an existing pool and
+         * there has never been a quota set in the dataset hierarchy.
+         * This is not an error.
+         */
+        if (delta < 0 && dd->dd_phys->dd_dataset_count < (delta * -1)) {
+#ifdef _KERNEL
+                extern void __dtrace_probe_zfs__dscnt__adj__neg(char *);
+                __dtrace_probe_zfs__dscnt__adj__neg(dd->dd_myname);
+#endif
+                mutex_exit(&dd->dd_lock);
+                return;
+        }
+
+        dd->dd_phys->dd_dataset_count += delta;
+
+        if (dd->dd_parent != NULL)
+                dsl_dir_dscount_adjust(dd->dd_parent, tx, delta, syncing,
+                    B_FALSE);
+
+        mutex_exit(&dd->dd_lock);
+}
+
 uint64_t
 dsl_dir_create_sync(dsl_pool_t *dp, dsl_dir_t *pds, const char *name,
     dmu_tx_t *tx)
 {
         objset_t *mos = dp->dp_meta_objset;

@@ -486,10 +857,24 @@
 
         ASSERT(RW_WRITE_HELD(&dd->dd_pool->dp_config_rwlock));
         ASSERT(dd->dd_phys->dd_head_dataset_obj == 0);
 
         /*
+         * Decrement the dataset count for all parent datasets.
+         *
+         * We have to worry about a special case where we are receiving a
+         * dataset that already exists. In this case a temporary clone name
+         * of %X is created (see dmu_recv_begin). In dmu_recv_existing_end we
+         * destroy this temporary clone which leads to here. We don't want to
+         * decrement the dataset counters in this case, since we never
+         * incremented them. To detect this case we check the tag for
+         * "tmp_dmu_recv_tag" to see if we're in that code path.
+         */
+        if (dd->dd_parent != NULL && strcmp(tag, tmp_dmu_recv_tag) != 0)
+                dsl_dir_dscount_adjust(dd->dd_parent, tx, -1, B_TRUE, B_TRUE);
+
+        /*
          * Remove our reservation. The impl() routine avoids setting the
          * actual property, which would require the (already destroyed) ds.
          */
         dsl_dir_set_reservation_sync_impl(dd, 0, tx);
 

@@ -1034,12 +1419,10 @@
         }
         mutex_exit(&dd->dd_lock);
         return (err);
 }
 
-extern dsl_syncfunc_t dsl_prop_set_sync;
-
 static void
 dsl_dir_set_quota_sync(void *arg1, void *arg2, dmu_tx_t *tx)
 {
         dsl_dataset_t *ds = arg1;
         dsl_dir_t *dd = ds->ds_dir;

@@ -1276,11 +1659,11 @@
                 /* no rename into our descendant */
                 if (closest_common_ancestor(dd, ra->newparent) == dd)
                         return (EINVAL);
 
                 if (err = dsl_dir_transfer_possible(dd->dd_parent,
-                    ra->newparent, myspace))
+                    ra->newparent, dd, myspace, tx))
                         return (err);
         }
 
         return (0);
 }

@@ -1301,10 +1684,24 @@
         dsl_dir_name(ra->newparent, namebuf);
         spa_history_log_internal_dd(dd, "rename", tx,
             "-> %s/%s", namebuf, ra->mynewname);
 
         if (ra->newparent != dd->dd_parent) {
+                int cnt;
+
+                mutex_enter(&dd->dd_lock);
+
+                cnt = dd->dd_phys->dd_dataset_count + 1;
+                dsl_dir_dscount_adjust(dd->dd_parent, tx, -cnt, B_TRUE, B_TRUE);
+                dsl_dir_dscount_adjust(ra->newparent, tx, cnt, B_TRUE, B_TRUE);
+
+                cnt = dd->dd_phys->dd_snapshot_count;
+                dsl_snapcount_adjust(dd->dd_parent, tx, -cnt, B_TRUE);
+                dsl_snapcount_adjust(ra->newparent, tx, cnt, B_TRUE);
+
+                mutex_exit(&dd->dd_lock);
+
                 dsl_dir_diduse_space(dd->dd_parent, DD_USED_CHILD,
                     -dd->dd_phys->dd_used_bytes,
                     -dd->dd_phys->dd_compressed_bytes,
                     -dd->dd_phys->dd_uncompressed_bytes, tx);
                 dsl_dir_diduse_space(ra->newparent, DD_USED_CHILD,

@@ -1373,22 +1770,35 @@
         dsl_dir_close(ra.newparent, FTAG);
         return (err);
 }
 
 int
-dsl_dir_transfer_possible(dsl_dir_t *sdd, dsl_dir_t *tdd, uint64_t space)
+dsl_dir_transfer_possible(dsl_dir_t *sdd, dsl_dir_t *tdd, dsl_dir_t *moving_dd,
+    uint64_t space, dmu_tx_t *tx)
 {
         dsl_dir_t *ancestor;
         int64_t adelta;
         uint64_t avail;
+        int err;
 
         ancestor = closest_common_ancestor(sdd, tdd);
         adelta = would_change(sdd, -space, ancestor);
         avail = dsl_dir_space_available(tdd, ancestor, adelta, FALSE);
         if (avail < space)
                 return (ENOSPC);
 
+        if (sdd != moving_dd) {
+                err = dsl_dir_dscount_check(tdd, tx,
+                    moving_dd->dd_phys->dd_dataset_count + 1, ancestor);
+                if (err != 0)
+                        return (err);
+        }
+        err = dsl_snapcount_check(tdd, tx,
+            moving_dd->dd_phys->dd_snapshot_count, ancestor);
+        if (err != 0)
+                return (err);
+
         return (0);
 }
 
 timestruc_t
 dsl_dir_snap_cmtime(dsl_dir_t *dd)