Print this page
OS-1566 dataset quota for ZFS datasets

@@ -49,10 +49,11 @@
 
 /* Set this tunable to TRUE to replace corrupt data with 0x2f5baddb10c */
 int zfs_send_corrupt_data = B_FALSE;
 
 static char *dmu_recv_tag = "dmu_recv_tag";
+char *tmp_dmu_recv_tag = "tmp_dmu_recv_tag";
 
 static int
 dump_bytes(dmu_sendarg_t *dsp, void *buf, int len)
 {
         dsl_dataset_t *ds = dsp->dsa_os->os_dsl_dataset;

@@ -641,10 +642,26 @@
                         return (EINVAL);
                 if (rbsa->origin->ds_phys->ds_guid != rbsa->fromguid)
                         return (ENODEV);
         }
 
+        /*
+         * Check dataset and snapshot quotas before receiving. We'll recheck
+         * again at the end, but might as well abort before receiving if we're
+         * already over quota.
+         */
+        if (dd->dd_parent != NULL) {
+                err = dsl_dir_dscount_check(dd->dd_parent, NULL, 1, NULL);
+                if (err != 0)
+                        return (err);
+        }
+
+        err = dsl_snapcount_check(dd, tx, 1, NULL);
+        if (err != 0)
+                return (err);
+
+
         return (0);
 }
 
 static void
 recv_new_sync(void *arg1, void *arg2, dmu_tx_t *tx)

@@ -723,10 +740,15 @@
                 }
         } else {
                 /* if full, most recent snapshot must be $ORIGIN */
                 if (ds->ds_phys->ds_prev_snap_txg >= TXG_INITIAL)
                         return (ENODEV);
+
+                /* Check snapshot quota before receiving */
+                err = dsl_snapcount_check(ds->ds_dir, tx, 1, NULL);
+                if (err != 0)
+                        return (err);
         }
 
         /* temporary clone name must not exist */
         err = zap_lookup(ds->ds_dir->dd_pool->dp_meta_objset,
             ds->ds_dir->dd_phys->dd_child_dir_zapobj,

@@ -1545,27 +1567,47 @@
 
 struct recvendsyncarg {
         char *tosnap;
         uint64_t creation_time;
         uint64_t toguid;
+        boolean_t is_new;
 };
 
 static int
 recv_end_check(void *arg1, void *arg2, dmu_tx_t *tx)
 {
         dsl_dataset_t *ds = arg1;
         struct recvendsyncarg *resa = arg2;
 
-        return (dsl_dataset_snapshot_check(ds, resa->tosnap, tx));
+        if (resa->is_new) {
+                /* re-check the dataset quota now that recv is complete */
+                dsl_dir_t *dd;
+                int err;
+
+                dd = ds->ds_dir;
+                if (dd->dd_parent != NULL) {
+                        err = dsl_dir_dscount_check(dd->dd_parent, NULL, 1,
+                            NULL);
+                        if (err != 0)
+                                return (err);
+                }
+        }
+
+        return (dsl_dataset_snapshot_check(ds, resa->tosnap, 1, tx));
 }
 
 static void
 recv_end_sync(void *arg1, void *arg2, dmu_tx_t *tx)
 {
         dsl_dataset_t *ds = arg1;
         struct recvendsyncarg *resa = arg2;
 
+        if (resa->is_new)
+                /* update the dataset counts */
+                dsl_dir_dscount_adjust(ds->ds_dir->dd_parent, tx, 1, B_FALSE,
+                    B_TRUE);
+
         dsl_dataset_snapshot_sync(ds, resa->tosnap, tx);
 
         /* set snapshot's creation time and guid */
         dmu_buf_will_dirty(ds->ds_prev->ds_dbuf, tx);
         ds->ds_prev->ds_phys->ds_creation_time = resa->creation_time;

@@ -1614,18 +1656,20 @@
                 if (err)
                         goto out;
         } else {
                 mutex_exit(&ds->ds_recvlock);
                 dsl_dataset_rele(ds, dmu_recv_tag);
-                (void) dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag,
+                /* tag indicates temporary ds to dsl_dir_destroy_sync */
+                (void) dsl_dataset_destroy(drc->drc_real_ds, tmp_dmu_recv_tag,
                     B_FALSE);
                 return (EBUSY);
         }
 
         resa.creation_time = drc->drc_drrb->drr_creation_time;
         resa.toguid = drc->drc_drrb->drr_toguid;
         resa.tosnap = drc->drc_tosnap;
+        resa.is_new = B_FALSE;
 
         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
             recv_end_check, recv_end_sync, ds, &resa, 3);
         if (err) {
                 /* swap back */

@@ -1635,11 +1679,13 @@
 out:
         mutex_exit(&ds->ds_recvlock);
         if (err == 0 && drc->drc_guid_to_ds_map != NULL)
                 (void) add_ds_to_guidmap(drc->drc_guid_to_ds_map, ds);
         dsl_dataset_disown(ds, dmu_recv_tag);
-        myerr = dsl_dataset_destroy(drc->drc_real_ds, dmu_recv_tag, B_FALSE);
+        /* tag indicates temporary ds to dsl_dir_destroy_sync */
+        myerr = dsl_dataset_destroy(drc->drc_real_ds, tmp_dmu_recv_tag,
+            B_FALSE);
         ASSERT0(myerr);
         return (err);
 }
 
 static int

@@ -1657,10 +1703,11 @@
         txg_wait_synced(ds->ds_dir->dd_pool, 0);
 
         resa.creation_time = drc->drc_drrb->drr_creation_time;
         resa.toguid = drc->drc_drrb->drr_toguid;
         resa.tosnap = drc->drc_tosnap;
+        resa.is_new = B_TRUE;
 
         err = dsl_sync_task_do(ds->ds_dir->dd_pool,
             recv_end_check, recv_end_sync, ds, &resa, 3);
         if (err) {
                 /* clean up the fs we just recv'd into */