Print this page
OS-1566 dataset quota for ZFS datasets

@@ -19,10 +19,11 @@
  * CDDL HEADER END
  */
 /*
  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  * Copyright (c) 2012 by Delphix. All rights reserved.
+ * Copyright (c) 2012, Joyent, Inc. All rights reserved.
  */
 
 /* Portions Copyright 2010 Robert Milkowski */
 
 #include <sys/cred.h>

@@ -688,11 +689,11 @@
                 /* You can only clone snapshots, not the head datasets. */
                 if (!dsl_dataset_is_snapshot(oa->clone_origin))
                         return (EINVAL);
         }
 
-        return (0);
+        return (dsl_dir_dscount_check(dd, tx, 1, NULL));
 }
 
 static void
 dmu_objset_create_sync(void *arg1, void *arg2, dmu_tx_t *tx)
 {

@@ -703,10 +704,12 @@
         dsl_dataset_t *ds;
         blkptr_t *bp;
 
         ASSERT(dmu_tx_is_syncing(tx));
 
+        dsl_dir_dscount_adjust(dd, tx, 1, B_TRUE, B_TRUE);
+
         obj = dsl_dataset_create_sync(dd, oa->lastname,
             oa->clone_origin, oa->flags, oa->cr, tx);
 
         VERIFY3U(0, ==, dsl_dataset_hold_obj(dd->dd_pool, obj, FTAG, &ds));
         bp = dsl_dataset_get_blkptr(ds);

@@ -805,10 +808,11 @@
 
 typedef struct snapallarg {
         dsl_sync_task_group_t *saa_dstg;
         boolean_t saa_needsuspend;
         nvlist_t *saa_props;
+        uint64_t saa_tot_cnt;
 
         /* the following are used only if 'temporary' is set: */
         boolean_t saa_temporary;
         const char *saa_htag;
         struct dsl_ds_holdarg *saa_ha;

@@ -829,12 +833,48 @@
         snapallarg_t *saa = soa->soa_saa;
         int error;
 
         /* The props have already been checked by zfs_check_userprops(). */
 
+        /*
+         * The saa_tot_cnt is used to track how many snapshots there are going
+         * to be at the highest level of the snapshot tree. This is necessary
+         * because the counts are not actually adjusted when we are checking,
+         * only when we finally sync. For a single snapshot, this is easy, the
+         * count is 1, but it gets more complicated for recursive snapshots.
+         *
+         * We only enforce the snapshot quota at the level where the snapshot
+         * is being taken. This is to prevent datasets with a full snapshot
+         * count at a lower level from blocking recursive snapshots being taken
+         * at a higher level. For example, the quota is only enforced on 'a'
+         * and 'b' when taking a recursive snapshot of a/b@x with the following
+         * existing state:
+         *      a/b     (0 snaps, snap quota is 5)
+         *      a/b/c   (0 snaps, snap quota is none)
+         *      a/b/d   (1 snaps, snap quota is 1)
+         * A recursive snapshot of a/b will be allowed since it results in
+         * 3 new snapshots (a/b@x, a/b/c@x, a/b/d@x), even though a/b/d already
+         * has 1 snapshot and has hit its quota (note that the existing
+         * snapshot on a/b/d is being counted against the quota on a/b). When
+         * the snapshot completes, a/b will have a snapshot count of 4 and
+         * a/b/d will have a count of 2. As can be seen, this means that
+         * datasets can have a snapshot count > their quota.
+         *
+         * In order to properly handle recursive snapshots, we increment the
+         * total count in open context, but this count is not validated in open
+         * context. This gives us the maximum count to validate at the
+         * top-level dataset when we're in syncing context. We then use a count
+         * of 0 in syncing conext as we descend the tree past the top-level
+         * snapshot so that lower levels are not being validated against their
+         * quota.
+         */
+        if (!dmu_tx_is_syncing(tx))
+                saa->saa_tot_cnt++;
         error = dsl_dataset_snapshot_check(os->os_dsl_dataset,
-            soa->soa_snapname, tx);
+            soa->soa_snapname, saa->saa_tot_cnt, tx);
+        if (dmu_tx_is_syncing(tx))
+                saa->saa_tot_cnt = 0;
         if (error)
                 return (error);
 
         if (saa->saa_temporary) {
                 /*