1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Portions Copyright 2011 Martin Matuska
  24  * Copyright (c) 2013 by Delphix. All rights reserved.
  25  */
  26 
  27 #include <sys/zfs_context.h>
  28 #include <sys/txg_impl.h>
  29 #include <sys/dmu_impl.h>
  30 #include <sys/dmu_tx.h>
  31 #include <sys/dsl_pool.h>
  32 #include <sys/dsl_scan.h>
  33 #include <sys/callb.h>
  34 
  35 /*
  36  * ZFS Transaction Groups
  37  * ----------------------
  38  *
  39  * ZFS transaction groups are, as the name implies, groups of transactions
  40  * that act on persistent state. ZFS asserts consistency at the granularity of
  41  * these transaction groups. Each successive transaction group (txg) is
  42  * assigned a 64-bit consecutive identifier. There are three active
  43  * transaction group states: open, quiescing, or syncing. At any given time,
  44  * there may be an active txg associated with each state; each active txg may
  45  * either be processing, or blocked waiting to enter the next state. There may
  46  * be up to three active txgs, and there is always a txg in the open state
  47  * (though it may be blocked waiting to enter the quiescing state). In broad
  48  * strokes, transactions -- operations that change in-memory structures -- are
  49  * accepted into the txg in the open state, and are completed while the txg is
  50  * in the open or quiescing states. The accumulated changes are written to
  51  * disk in the syncing state.
  52  *
  53  * Open
  54  *
  55  * When a new txg becomes active, it first enters the open state. New
  56  * transactions -- updates to in-memory structures -- are assigned to the
  57  * currently open txg. There is always a txg in the open state so that ZFS can
  58  * accept new changes (though the txg may refuse new changes if it has hit
  59  * some limit). ZFS advances the open txg to the next state for a variety of
  60  * reasons such as it hitting a time or size threshold, or the execution of an
  61  * administrative action that must be completed in the syncing state.
  62  *
  63  * Quiescing
  64  *
  65  * After a txg exits the open state, it enters the quiescing state. The
  66  * quiescing state is intended to provide a buffer between accepting new
  67  * transactions in the open state and writing them out to stable storage in
  68  * the syncing state. While quiescing, transactions can continue their
  69  * operation without delaying either of the other states. Typically, a txg is
  70  * in the quiescing state very briefly since the operations are bounded by
  71  * software latencies rather than, say, slower I/O latencies. After all
  72  * transactions complete, the txg is ready to enter the next state.
  73  *
  74  * Syncing
  75  *
  76  * In the syncing state, the in-memory state built up during the open and (to
  77  * a lesser degree) the quiescing states is written to stable storage. The
  78  * process of writing out modified data can, in turn modify more data. For
  79  * example when we write new blocks, we need to allocate space for them; those
  80  * allocations modify metadata (space maps)... which themselves must be
  81  * written to stable storage. During the sync state, ZFS iterates, writing out
  82  * data until it converges and all in-memory changes have been written out.
  83  * The first such pass is the largest as it encompasses all the modified user
  84  * data (as opposed to filesystem metadata). Subsequent passes typically have
  85  * far less data to write as they consist exclusively of filesystem metadata.
  86  *
  87  * To ensure convergence, after a certain number of passes ZFS begins
  88  * overwriting locations on stable storage that had been allocated earlier in
  89  * the syncing state (and subsequently freed). ZFS usually allocates new
  90  * blocks to optimize for large, continuous, writes. For the syncing state to
  91  * converge however it must complete a pass where no new blocks are allocated
  92  * since each allocation requires a modification of persistent metadata.
  93  * Further, to hasten convergence, after a prescribed number of passes, ZFS
  94  * also defers frees, and stops compressing.
  95  *
  96  * In addition to writing out user data, we must also execute synctasks during
  97  * the syncing context. A synctask is the mechanism by which some
  98  * administrative activities work such as creating and destroying snapshots or
  99  * datasets. Note that when a synctask is initiated it enters the open txg,
 100  * and ZFS then pushes that txg as quickly as possible to completion of the
 101  * syncing state in order to reduce the latency of the administrative
 102  * activity. To complete the syncing state, ZFS writes out a new uberblock,
 103  * the root of the tree of blocks that comprise all state stored on the ZFS
 104  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
 105  * now transition to the syncing state.
 106  */
 107 
 108 static void txg_sync_thread(dsl_pool_t *dp);
 109 static void txg_quiesce_thread(dsl_pool_t *dp);
 110 
 111 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
 112 
 113 /*
 114  * Prepare the txg subsystem.
 115  */
 116 void
 117 txg_init(dsl_pool_t *dp, uint64_t txg)
 118 {
 119         tx_state_t *tx = &dp->dp_tx;
 120         int c;
 121         bzero(tx, sizeof (tx_state_t));
 122 
 123         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 124 
 125         for (c = 0; c < max_ncpus; c++) {
 126                 int i;
 127 
 128                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
 129                 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
 130                     NULL);
 131                 for (i = 0; i < TXG_SIZE; i++) {
 132                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
 133                             NULL);
 134                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
 135                             sizeof (dmu_tx_callback_t),
 136                             offsetof(dmu_tx_callback_t, dcb_node));
 137                 }
 138         }
 139 
 140         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
 141 
 142         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
 143         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
 144         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
 145         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
 146         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
 147 
 148         tx->tx_open_txg = txg;
 149 }
 150 
 151 /*
 152  * Close down the txg subsystem.
 153  */
 154 void
 155 txg_fini(dsl_pool_t *dp)
 156 {
 157         tx_state_t *tx = &dp->dp_tx;
 158         int c;
 159 
 160         ASSERT(tx->tx_threads == 0);
 161 
 162         mutex_destroy(&tx->tx_sync_lock);
 163 
 164         cv_destroy(&tx->tx_sync_more_cv);
 165         cv_destroy(&tx->tx_sync_done_cv);
 166         cv_destroy(&tx->tx_quiesce_more_cv);
 167         cv_destroy(&tx->tx_quiesce_done_cv);
 168         cv_destroy(&tx->tx_exit_cv);
 169 
 170         for (c = 0; c < max_ncpus; c++) {
 171                 int i;
 172 
 173                 mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
 174                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
 175                 for (i = 0; i < TXG_SIZE; i++) {
 176                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
 177                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
 178                 }
 179         }
 180 
 181         if (tx->tx_commit_cb_taskq != NULL)
 182                 taskq_destroy(tx->tx_commit_cb_taskq);
 183 
 184         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 185 
 186         bzero(tx, sizeof (tx_state_t));
 187 }
 188 
 189 /*
 190  * Start syncing transaction groups.
 191  */
 192 void
 193 txg_sync_start(dsl_pool_t *dp)
 194 {
 195         tx_state_t *tx = &dp->dp_tx;
 196 
 197         mutex_enter(&tx->tx_sync_lock);
 198 
 199         dprintf("pool %p\n", dp);
 200 
 201         ASSERT(tx->tx_threads == 0);
 202 
 203         tx->tx_threads = 2;
 204 
 205         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
 206             dp, 0, &p0, TS_RUN, minclsyspri);
 207 
 208         /*
 209          * The sync thread can need a larger-than-default stack size on
 210          * 32-bit x86.  This is due in part to nested pools and
 211          * scrub_visitbp() recursion.
 212          */
 213         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
 214             dp, 0, &p0, TS_RUN, minclsyspri);
 215 
 216         mutex_exit(&tx->tx_sync_lock);
 217 }
 218 
 219 static void
 220 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
 221 {
 222         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
 223         mutex_enter(&tx->tx_sync_lock);
 224 }
 225 
 226 static void
 227 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
 228 {
 229         ASSERT(*tpp != NULL);
 230         *tpp = NULL;
 231         tx->tx_threads--;
 232         cv_broadcast(&tx->tx_exit_cv);
 233         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
 234         thread_exit();
 235 }
 236 
 237 static void
 238 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
 239 {
 240         CALLB_CPR_SAFE_BEGIN(cpr);
 241 
 242         if (time)
 243                 (void) cv_timedwait(cv, &tx->tx_sync_lock,
 244                     ddi_get_lbolt() + time);
 245         else
 246                 cv_wait(cv, &tx->tx_sync_lock);
 247 
 248         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
 249 }
 250 
 251 /*
 252  * Stop syncing transaction groups.
 253  */
 254 void
 255 txg_sync_stop(dsl_pool_t *dp)
 256 {
 257         tx_state_t *tx = &dp->dp_tx;
 258 
 259         dprintf("pool %p\n", dp);
 260         /*
 261          * Finish off any work in progress.
 262          */
 263         ASSERT(tx->tx_threads == 2);
 264 
 265         /*
 266          * We need to ensure that we've vacated the deferred space_maps.
 267          */
 268         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
 269 
 270         /*
 271          * Wake all sync threads and wait for them to die.
 272          */
 273         mutex_enter(&tx->tx_sync_lock);
 274 
 275         ASSERT(tx->tx_threads == 2);
 276 
 277         tx->tx_exiting = 1;
 278 
 279         cv_broadcast(&tx->tx_quiesce_more_cv);
 280         cv_broadcast(&tx->tx_quiesce_done_cv);
 281         cv_broadcast(&tx->tx_sync_more_cv);
 282 
 283         while (tx->tx_threads != 0)
 284                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
 285 
 286         tx->tx_exiting = 0;
 287 
 288         mutex_exit(&tx->tx_sync_lock);
 289 }
 290 
 291 uint64_t
 292 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
 293 {
 294         tx_state_t *tx = &dp->dp_tx;
 295         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
 296         uint64_t txg;
 297 
 298         mutex_enter(&tc->tc_open_lock);
 299         txg = tx->tx_open_txg;
 300 
 301         mutex_enter(&tc->tc_lock);
 302         tc->tc_count[txg & TXG_MASK]++;
 303         mutex_exit(&tc->tc_lock);
 304 
 305         th->th_cpu = tc;
 306         th->th_txg = txg;
 307 
 308         return (txg);
 309 }
 310 
 311 void
 312 txg_rele_to_quiesce(txg_handle_t *th)
 313 {
 314         tx_cpu_t *tc = th->th_cpu;
 315 
 316         ASSERT(!MUTEX_HELD(&tc->tc_lock));
 317         mutex_exit(&tc->tc_open_lock);
 318 }
 319 
 320 void
 321 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 322 {
 323         tx_cpu_t *tc = th->th_cpu;
 324         int g = th->th_txg & TXG_MASK;
 325 
 326         mutex_enter(&tc->tc_lock);
 327         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 328         mutex_exit(&tc->tc_lock);
 329 }
 330 
 331 void
 332 txg_rele_to_sync(txg_handle_t *th)
 333 {
 334         tx_cpu_t *tc = th->th_cpu;
 335         int g = th->th_txg & TXG_MASK;
 336 
 337         mutex_enter(&tc->tc_lock);
 338         ASSERT(tc->tc_count[g] != 0);
 339         if (--tc->tc_count[g] == 0)
 340                 cv_broadcast(&tc->tc_cv[g]);
 341         mutex_exit(&tc->tc_lock);
 342 
 343         th->th_cpu = NULL;   /* defensive */
 344 }
 345 
 346 /*
 347  * Blocks until all transactions in the group are committed.
 348  *
 349  * On return, the transaction group has reached a stable state in which it can
 350  * then be passed off to the syncing context.
 351  */
 352 static void
 353 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 354 {
 355         tx_state_t *tx = &dp->dp_tx;
 356         int g = txg & TXG_MASK;
 357         int c;
 358 
 359         /*
 360          * Grab all tc_open_locks so nobody else can get into this txg.
 361          */
 362         for (c = 0; c < max_ncpus; c++)
 363                 mutex_enter(&tx->tx_cpu[c].tc_open_lock);
 364 
 365         ASSERT(txg == tx->tx_open_txg);
 366         tx->tx_open_txg++;
 367         tx->tx_open_time = gethrtime();
 368 
 369         DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
 370         DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
 371 
 372         /*
 373          * Now that we've incremented tx_open_txg, we can let threads
 374          * enter the next transaction group.
 375          */
 376         for (c = 0; c < max_ncpus; c++)
 377                 mutex_exit(&tx->tx_cpu[c].tc_open_lock);
 378 
 379         /*
 380          * Quiesce the transaction group by waiting for everyone to txg_exit().
 381          */
 382         for (c = 0; c < max_ncpus; c++) {
 383                 tx_cpu_t *tc = &tx->tx_cpu[c];
 384                 mutex_enter(&tc->tc_lock);
 385                 while (tc->tc_count[g] != 0)
 386                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
 387                 mutex_exit(&tc->tc_lock);
 388         }
 389 }
 390 
 391 static void
 392 txg_do_callbacks(list_t *cb_list)
 393 {
 394         dmu_tx_do_callbacks(cb_list, 0);
 395 
 396         list_destroy(cb_list);
 397 
 398         kmem_free(cb_list, sizeof (list_t));
 399 }
 400 
 401 /*
 402  * Dispatch the commit callbacks registered on this txg to worker threads.
 403  *
 404  * If no callbacks are registered for a given TXG, nothing happens.
 405  * This function creates a taskq for the associated pool, if needed.
 406  */
 407 static void
 408 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
 409 {
 410         int c;
 411         tx_state_t *tx = &dp->dp_tx;
 412         list_t *cb_list;
 413 
 414         for (c = 0; c < max_ncpus; c++) {
 415                 tx_cpu_t *tc = &tx->tx_cpu[c];
 416                 /*
 417                  * No need to lock tx_cpu_t at this point, since this can
 418                  * only be called once a txg has been synced.
 419                  */
 420 
 421                 int g = txg & TXG_MASK;
 422 
 423                 if (list_is_empty(&tc->tc_callbacks[g]))
 424                         continue;
 425 
 426                 if (tx->tx_commit_cb_taskq == NULL) {
 427                         /*
 428                          * Commit callback taskq hasn't been created yet.
 429                          */
 430                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 431                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 432                             TASKQ_PREPOPULATE);
 433                 }
 434 
 435                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
 436                 list_create(cb_list, sizeof (dmu_tx_callback_t),
 437                     offsetof(dmu_tx_callback_t, dcb_node));
 438 
 439                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
 440 
 441                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 442                     txg_do_callbacks, cb_list, TQ_SLEEP);
 443         }
 444 }
 445 
 446 static void
 447 txg_sync_thread(dsl_pool_t *dp)
 448 {
 449         spa_t *spa = dp->dp_spa;
 450         tx_state_t *tx = &dp->dp_tx;
 451         callb_cpr_t cpr;
 452         uint64_t start, delta;
 453 
 454         txg_thread_enter(tx, &cpr);
 455 
 456         start = delta = 0;
 457         for (;;) {
 458                 uint64_t timeout = zfs_txg_timeout * hz;
 459                 uint64_t timer;
 460                 uint64_t txg;
 461 
 462                 /*
 463                  * We sync when we're scanning, there's someone waiting
 464                  * on us, or the quiesce thread has handed off a txg to
 465                  * us, or we have reached our timeout.
 466                  */
 467                 timer = (delta >= timeout ? 0 : timeout - delta);
 468                 while (!dsl_scan_active(dp->dp_scan) &&
 469                     !tx->tx_exiting && timer > 0 &&
 470                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 471                     tx->tx_quiesced_txg == 0 &&
 472                     dp->dp_dirty_total < zfs_dirty_data_sync) {
 473                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 474                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 475                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 476                         delta = ddi_get_lbolt() - start;
 477                         timer = (delta > timeout ? 0 : timeout - delta);
 478                 }
 479 
 480                 /*
 481                  * Wait until the quiesce thread hands off a txg to us,
 482                  * prompting it to do so if necessary.
 483                  */
 484                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
 485                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 486                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 487                         cv_broadcast(&tx->tx_quiesce_more_cv);
 488                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 489                 }
 490 
 491                 if (tx->tx_exiting)
 492                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 493 
 494                 /*
 495                  * Consume the quiesced txg which has been handed off to
 496                  * us.  This may cause the quiescing thread to now be
 497                  * able to quiesce another txg, so we must signal it.
 498                  */
 499                 txg = tx->tx_quiesced_txg;
 500                 tx->tx_quiesced_txg = 0;
 501                 tx->tx_syncing_txg = txg;
 502                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
 503                 cv_broadcast(&tx->tx_quiesce_more_cv);
 504 
 505                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 506                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 507                 mutex_exit(&tx->tx_sync_lock);
 508 
 509                 start = ddi_get_lbolt();
 510                 spa_sync(spa, txg);
 511                 delta = ddi_get_lbolt() - start;
 512 
 513                 mutex_enter(&tx->tx_sync_lock);
 514                 tx->tx_synced_txg = txg;
 515                 tx->tx_syncing_txg = 0;
 516                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
 517                 cv_broadcast(&tx->tx_sync_done_cv);
 518 
 519                 /*
 520                  * Dispatch commit callbacks to worker threads.
 521                  */
 522                 txg_dispatch_callbacks(dp, txg);
 523         }
 524 }
 525 
 526 static void
 527 txg_quiesce_thread(dsl_pool_t *dp)
 528 {
 529         tx_state_t *tx = &dp->dp_tx;
 530         callb_cpr_t cpr;
 531 
 532         txg_thread_enter(tx, &cpr);
 533 
 534         for (;;) {
 535                 uint64_t txg;
 536 
 537                 /*
 538                  * We quiesce when there's someone waiting on us.
 539                  * However, we can only have one txg in "quiescing" or
 540                  * "quiesced, waiting to sync" state.  So we wait until
 541                  * the "quiesced, waiting to sync" txg has been consumed
 542                  * by the sync thread.
 543                  */
 544                 while (!tx->tx_exiting &&
 545                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 546                     tx->tx_quiesced_txg != 0))
 547                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 548 
 549                 if (tx->tx_exiting)
 550                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 551 
 552                 txg = tx->tx_open_txg;
 553                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 554                     txg, tx->tx_quiesce_txg_waiting,
 555                     tx->tx_sync_txg_waiting);
 556                 mutex_exit(&tx->tx_sync_lock);
 557                 txg_quiesce(dp, txg);
 558                 mutex_enter(&tx->tx_sync_lock);
 559 
 560                 /*
 561                  * Hand this txg off to the sync thread.
 562                  */
 563                 dprintf("quiesce done, handing off txg %llu\n", txg);
 564                 tx->tx_quiesced_txg = txg;
 565                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
 566                 cv_broadcast(&tx->tx_sync_more_cv);
 567                 cv_broadcast(&tx->tx_quiesce_done_cv);
 568         }
 569 }
 570 
 571 /*
 572  * Delay this thread by delay nanoseconds if we are still in the open
 573  * transaction group and there is already a waiting txg quiescing or quiesced.
 574  * Abort the delay if this txg stalls or enters the quiescing state.
 575  */
 576 void
 577 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
 578 {
 579         tx_state_t *tx = &dp->dp_tx;
 580         hrtime_t start = gethrtime();
 581 
 582         /* don't delay if this txg could transition to quiescing immediately */
 583         if (tx->tx_open_txg > txg ||
 584             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
 585                 return;
 586 
 587         mutex_enter(&tx->tx_sync_lock);
 588         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
 589                 mutex_exit(&tx->tx_sync_lock);
 590                 return;
 591         }
 592 
 593         while (gethrtime() - start < delay &&
 594             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
 595                 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
 596                     &tx->tx_sync_lock, delay, resolution, 0);
 597         }
 598 
 599         mutex_exit(&tx->tx_sync_lock);
 600 }
 601 
 602 void
 603 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
 604 {
 605         tx_state_t *tx = &dp->dp_tx;
 606 
 607         ASSERT(!dsl_pool_config_held(dp));
 608 
 609         mutex_enter(&tx->tx_sync_lock);
 610         ASSERT(tx->tx_threads == 2);
 611         if (txg == 0)
 612                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
 613         if (tx->tx_sync_txg_waiting < txg)
 614                 tx->tx_sync_txg_waiting = txg;
 615         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 616             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 617         while (tx->tx_synced_txg < txg) {
 618                 dprintf("broadcasting sync more "
 619                     "tx_synced=%llu waiting=%llu dp=%p\n",
 620                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 621                 cv_broadcast(&tx->tx_sync_more_cv);
 622                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
 623         }
 624         mutex_exit(&tx->tx_sync_lock);
 625 }
 626 
 627 void
 628 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
 629 {
 630         tx_state_t *tx = &dp->dp_tx;
 631 
 632         ASSERT(!dsl_pool_config_held(dp));
 633 
 634         mutex_enter(&tx->tx_sync_lock);
 635         ASSERT(tx->tx_threads == 2);
 636         if (txg == 0)
 637                 txg = tx->tx_open_txg + 1;
 638         if (tx->tx_quiesce_txg_waiting < txg)
 639                 tx->tx_quiesce_txg_waiting = txg;
 640         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 641             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 642         while (tx->tx_open_txg < txg) {
 643                 cv_broadcast(&tx->tx_quiesce_more_cv);
 644                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 645         }
 646         mutex_exit(&tx->tx_sync_lock);
 647 }
 648 
 649 /*
 650  * If there isn't a txg syncing or in the pipeline, push another txg through
 651  * the pipeline by queiscing the open txg.
 652  */
 653 void
 654 txg_kick(dsl_pool_t *dp)
 655 {
 656         tx_state_t *tx = &dp->dp_tx;
 657 
 658         ASSERT(!dsl_pool_config_held(dp));
 659 
 660         mutex_enter(&tx->tx_sync_lock);
 661         if (tx->tx_syncing_txg == 0 &&
 662             tx->tx_quiesce_txg_waiting <= tx->tx_open_txg &&
 663             tx->tx_sync_txg_waiting <= tx->tx_synced_txg &&
 664             tx->tx_quiesced_txg <= tx->tx_synced_txg) {
 665                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg + 1;
 666                 cv_broadcast(&tx->tx_quiesce_more_cv);
 667         }
 668         mutex_exit(&tx->tx_sync_lock);
 669 }
 670 
 671 boolean_t
 672 txg_stalled(dsl_pool_t *dp)
 673 {
 674         tx_state_t *tx = &dp->dp_tx;
 675         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 676 }
 677 
 678 boolean_t
 679 txg_sync_waiting(dsl_pool_t *dp)
 680 {
 681         tx_state_t *tx = &dp->dp_tx;
 682 
 683         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
 684             tx->tx_quiesced_txg != 0);
 685 }
 686 
 687 /*
 688  * Per-txg object lists.
 689  */
 690 void
 691 txg_list_create(txg_list_t *tl, size_t offset)
 692 {
 693         int t;
 694 
 695         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
 696 
 697         tl->tl_offset = offset;
 698 
 699         for (t = 0; t < TXG_SIZE; t++)
 700                 tl->tl_head[t] = NULL;
 701 }
 702 
 703 void
 704 txg_list_destroy(txg_list_t *tl)
 705 {
 706         int t;
 707 
 708         for (t = 0; t < TXG_SIZE; t++)
 709                 ASSERT(txg_list_empty(tl, t));
 710 
 711         mutex_destroy(&tl->tl_lock);
 712 }
 713 
 714 boolean_t
 715 txg_list_empty(txg_list_t *tl, uint64_t txg)
 716 {
 717         return (tl->tl_head[txg & TXG_MASK] == NULL);
 718 }
 719 
 720 /*
 721  * Add an entry to the list (unless it's already on the list).
 722  * Returns B_TRUE if it was actually added.
 723  */
 724 boolean_t
 725 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
 726 {
 727         int t = txg & TXG_MASK;
 728         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 729         boolean_t add;
 730 
 731         mutex_enter(&tl->tl_lock);
 732         add = (tn->tn_member[t] == 0);
 733         if (add) {
 734                 tn->tn_member[t] = 1;
 735                 tn->tn_next[t] = tl->tl_head[t];
 736                 tl->tl_head[t] = tn;
 737         }
 738         mutex_exit(&tl->tl_lock);
 739 
 740         return (add);
 741 }
 742 
 743 /*
 744  * Add an entry to the end of the list, unless it's already on the list.
 745  * (walks list to find end)
 746  * Returns B_TRUE if it was actually added.
 747  */
 748 boolean_t
 749 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
 750 {
 751         int t = txg & TXG_MASK;
 752         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 753         boolean_t add;
 754 
 755         mutex_enter(&tl->tl_lock);
 756         add = (tn->tn_member[t] == 0);
 757         if (add) {
 758                 txg_node_t **tp;
 759 
 760                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
 761                         continue;
 762 
 763                 tn->tn_member[t] = 1;
 764                 tn->tn_next[t] = NULL;
 765                 *tp = tn;
 766         }
 767         mutex_exit(&tl->tl_lock);
 768 
 769         return (add);
 770 }
 771 
 772 /*
 773  * Remove the head of the list and return it.
 774  */
 775 void *
 776 txg_list_remove(txg_list_t *tl, uint64_t txg)
 777 {
 778         int t = txg & TXG_MASK;
 779         txg_node_t *tn;
 780         void *p = NULL;
 781 
 782         mutex_enter(&tl->tl_lock);
 783         if ((tn = tl->tl_head[t]) != NULL) {
 784                 p = (char *)tn - tl->tl_offset;
 785                 tl->tl_head[t] = tn->tn_next[t];
 786                 tn->tn_next[t] = NULL;
 787                 tn->tn_member[t] = 0;
 788         }
 789         mutex_exit(&tl->tl_lock);
 790 
 791         return (p);
 792 }
 793 
 794 /*
 795  * Remove a specific item from the list and return it.
 796  */
 797 void *
 798 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 799 {
 800         int t = txg & TXG_MASK;
 801         txg_node_t *tn, **tp;
 802 
 803         mutex_enter(&tl->tl_lock);
 804 
 805         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
 806                 if ((char *)tn - tl->tl_offset == p) {
 807                         *tp = tn->tn_next[t];
 808                         tn->tn_next[t] = NULL;
 809                         tn->tn_member[t] = 0;
 810                         mutex_exit(&tl->tl_lock);
 811                         return (p);
 812                 }
 813         }
 814 
 815         mutex_exit(&tl->tl_lock);
 816 
 817         return (NULL);
 818 }
 819 
 820 boolean_t
 821 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
 822 {
 823         int t = txg & TXG_MASK;
 824         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 825 
 826         return (tn->tn_member[t] != 0);
 827 }
 828 
 829 /*
 830  * Walk a txg list -- only safe if you know it's not changing.
 831  */
 832 void *
 833 txg_list_head(txg_list_t *tl, uint64_t txg)
 834 {
 835         int t = txg & TXG_MASK;
 836         txg_node_t *tn = tl->tl_head[t];
 837 
 838         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 839 }
 840 
 841 void *
 842 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 843 {
 844         int t = txg & TXG_MASK;
 845         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 846 
 847         tn = tn->tn_next[t];
 848 
 849         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 850 }