1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23  * Portions Copyright 2011 Martin Matuska
  24  * Copyright (c) 2013 by Delphix. All rights reserved.
  25  */
  26 
  27 #include <sys/zfs_context.h>
  28 #include <sys/txg_impl.h>
  29 #include <sys/dmu_impl.h>
  30 #include <sys/dmu_tx.h>
  31 #include <sys/dsl_pool.h>
  32 #include <sys/dsl_scan.h>
  33 #include <sys/callb.h>
  34 
  35 /*
  36  * ZFS Transaction Groups
  37  * ----------------------
  38  *
  39  * ZFS transaction groups are, as the name implies, groups of transactions
  40  * that act on persistent state. ZFS asserts consistency at the granularity of
  41  * these transaction groups. Each successive transaction group (txg) is
  42  * assigned a 64-bit consecutive identifier. There are three active
  43  * transaction group states: open, quiescing, or syncing. At any given time,
  44  * there may be an active txg associated with each state; each active txg may
  45  * either be processing, or blocked waiting to enter the next state. There may
  46  * be up to three active txgs, and there is always a txg in the open state
  47  * (though it may be blocked waiting to enter the quiescing state). In broad
  48  * strokes, transactions — operations that change in-memory structures — are
  49  * accepted into the txg in the open state, and are completed while the txg is
  50  * in the open or quiescing states. The accumulated changes are written to
  51  * disk in the syncing state.
  52  *
  53  * Open
  54  *
  55  * When a new txg becomes active, it first enters the open state. New
  56  * transactions — updates to in-memory structures — are assigned to the
  57  * currently open txg. There is always a txg in the open state so that ZFS can
  58  * accept new changes (though the txg may refuse new changes if it has hit
  59  * some limit). ZFS advances the open txg to the next state for a variety of
  60  * reasons such as it hitting a time or size threshold, or the execution of an
  61  * administrative action that must be completed in the syncing state.
  62  *
  63  * Quiescing
  64  *
  65  * After a txg exits the open state, it enters the quiescing state. The
  66  * quiescing state is intended to provide a buffer between accepting new
  67  * transactions in the open state and writing them out to stable storage in
  68  * the syncing state. While quiescing, transactions can continue their
  69  * operation without delaying either of the other states. Typically, a txg is
  70  * in the quiescing state very briefly since the operations are bounded by
  71  * software latencies rather than, say, slower I/O latencies. After all
  72  * transactions complete, the txg is ready to enter the next state.
  73  *
  74  * Syncing
  75  *
  76  * In the syncing state, the in-memory state built up during the open and (to
  77  * a lesser degree) the quiescing states is written to stable storage. The
  78  * process of writing out modified data can, in turn modify more data. For
  79  * example when we write new blocks, we need to allocate space for them; those
  80  * allocations modify metadata (space maps)... which themselves must be
  81  * written to stable storage. During the sync state, ZFS iterates, writing out
  82  * data until it converges and all in-memory changes have been written out.
  83  * The first such pass is the largest as it encompasses all the modified user
  84  * data (as opposed to filesystem metadata). Subsequent passes typically have
  85  * far less data to write as they consist exclusively of filesystem metadata.
  86  *
  87  * To ensure convergence, after a certain number of passes ZFS begins
  88  * overwriting locations on stable storage that had been allocated earlier in
  89  * the syncing state (and subsequently freed). ZFS usually allocates new
  90  * blocks to optimize for large, continuous, writes. For the syncing state to
  91  * converge however it must complete a pass where no new blocks are allocated
  92  * since each allocation requires a modification of persistent metadata.
  93  * Further, to hasten convergence, after a prescribed number of passes, ZFS
  94  * also defers frees, and stops compressing.
  95  *
  96  * In addition to writing out user data, we must also execute synctasks during
  97  * the syncing context. A synctask is the mechanism by which some
  98  * administrative activities work such as creating and destroying snapshots or
  99  * datasets. Note that when a synctask is initiated it enters the open txg,
 100  * and ZFS then pushes that txg as quickly as possible to completion of the
 101  * syncing state in order to reduce the latency of the administrative
 102  * activity. To complete the syncing state, ZFS writes out a new uberblock,
 103  * the root of the tree of blocks that comprise all state stored on the ZFS
 104  * pool. Finally, if there is a quiesced txg waiting, we signal that it can
 105  * now transition to the syncing state.
 106  */
 107 
 108 static void txg_sync_thread(dsl_pool_t *dp);
 109 static void txg_quiesce_thread(dsl_pool_t *dp);
 110 
 111 int zfs_txg_timeout = 5;        /* max seconds worth of delta per txg */
 112 
 113 /*
 114  * Prepare the txg subsystem.
 115  */
 116 void
 117 txg_init(dsl_pool_t *dp, uint64_t txg)
 118 {
 119         tx_state_t *tx = &dp->dp_tx;
 120         int c;
 121         bzero(tx, sizeof (tx_state_t));
 122 
 123         tx->tx_cpu = kmem_zalloc(max_ncpus * sizeof (tx_cpu_t), KM_SLEEP);
 124 
 125         for (c = 0; c < max_ncpus; c++) {
 126                 int i;
 127 
 128                 mutex_init(&tx->tx_cpu[c].tc_lock, NULL, MUTEX_DEFAULT, NULL);
 129                 mutex_init(&tx->tx_cpu[c].tc_open_lock, NULL, MUTEX_DEFAULT,
 130                     NULL);
 131                 for (i = 0; i < TXG_SIZE; i++) {
 132                         cv_init(&tx->tx_cpu[c].tc_cv[i], NULL, CV_DEFAULT,
 133                             NULL);
 134                         list_create(&tx->tx_cpu[c].tc_callbacks[i],
 135                             sizeof (dmu_tx_callback_t),
 136                             offsetof(dmu_tx_callback_t, dcb_node));
 137                 }
 138         }
 139 
 140         mutex_init(&tx->tx_sync_lock, NULL, MUTEX_DEFAULT, NULL);
 141 
 142         cv_init(&tx->tx_sync_more_cv, NULL, CV_DEFAULT, NULL);
 143         cv_init(&tx->tx_sync_done_cv, NULL, CV_DEFAULT, NULL);
 144         cv_init(&tx->tx_quiesce_more_cv, NULL, CV_DEFAULT, NULL);
 145         cv_init(&tx->tx_quiesce_done_cv, NULL, CV_DEFAULT, NULL);
 146         cv_init(&tx->tx_exit_cv, NULL, CV_DEFAULT, NULL);
 147 
 148         tx->tx_open_txg = txg;
 149 }
 150 
 151 /*
 152  * Close down the txg subsystem.
 153  */
 154 void
 155 txg_fini(dsl_pool_t *dp)
 156 {
 157         tx_state_t *tx = &dp->dp_tx;
 158         int c;
 159 
 160         ASSERT(tx->tx_threads == 0);
 161 
 162         mutex_destroy(&tx->tx_sync_lock);
 163 
 164         cv_destroy(&tx->tx_sync_more_cv);
 165         cv_destroy(&tx->tx_sync_done_cv);
 166         cv_destroy(&tx->tx_quiesce_more_cv);
 167         cv_destroy(&tx->tx_quiesce_done_cv);
 168         cv_destroy(&tx->tx_exit_cv);
 169 
 170         for (c = 0; c < max_ncpus; c++) {
 171                 int i;
 172 
 173                 mutex_destroy(&tx->tx_cpu[c].tc_open_lock);
 174                 mutex_destroy(&tx->tx_cpu[c].tc_lock);
 175                 for (i = 0; i < TXG_SIZE; i++) {
 176                         cv_destroy(&tx->tx_cpu[c].tc_cv[i]);
 177                         list_destroy(&tx->tx_cpu[c].tc_callbacks[i]);
 178                 }
 179         }
 180 
 181         if (tx->tx_commit_cb_taskq != NULL)
 182                 taskq_destroy(tx->tx_commit_cb_taskq);
 183 
 184         kmem_free(tx->tx_cpu, max_ncpus * sizeof (tx_cpu_t));
 185 
 186         bzero(tx, sizeof (tx_state_t));
 187 }
 188 
 189 /*
 190  * Start syncing transaction groups.
 191  */
 192 void
 193 txg_sync_start(dsl_pool_t *dp)
 194 {
 195         tx_state_t *tx = &dp->dp_tx;
 196 
 197         mutex_enter(&tx->tx_sync_lock);
 198 
 199         dprintf("pool %p\n", dp);
 200 
 201         ASSERT(tx->tx_threads == 0);
 202 
 203         tx->tx_threads = 2;
 204 
 205         tx->tx_quiesce_thread = thread_create(NULL, 0, txg_quiesce_thread,
 206             dp, 0, &p0, TS_RUN, minclsyspri);
 207 
 208         /*
 209          * The sync thread can need a larger-than-default stack size on
 210          * 32-bit x86.  This is due in part to nested pools and
 211          * scrub_visitbp() recursion.
 212          */
 213         tx->tx_sync_thread = thread_create(NULL, 32<<10, txg_sync_thread,
 214             dp, 0, &p0, TS_RUN, minclsyspri);
 215 
 216         mutex_exit(&tx->tx_sync_lock);
 217 }
 218 
 219 static void
 220 txg_thread_enter(tx_state_t *tx, callb_cpr_t *cpr)
 221 {
 222         CALLB_CPR_INIT(cpr, &tx->tx_sync_lock, callb_generic_cpr, FTAG);
 223         mutex_enter(&tx->tx_sync_lock);
 224 }
 225 
 226 static void
 227 txg_thread_exit(tx_state_t *tx, callb_cpr_t *cpr, kthread_t **tpp)
 228 {
 229         ASSERT(*tpp != NULL);
 230         *tpp = NULL;
 231         tx->tx_threads--;
 232         cv_broadcast(&tx->tx_exit_cv);
 233         CALLB_CPR_EXIT(cpr);            /* drops &tx->tx_sync_lock */
 234         thread_exit();
 235 }
 236 
 237 static void
 238 txg_thread_wait(tx_state_t *tx, callb_cpr_t *cpr, kcondvar_t *cv, clock_t time)
 239 {
 240         CALLB_CPR_SAFE_BEGIN(cpr);
 241 
 242         if (time)
 243                 (void) cv_timedwait(cv, &tx->tx_sync_lock,
 244                     ddi_get_lbolt() + time);
 245         else
 246                 cv_wait(cv, &tx->tx_sync_lock);
 247 
 248         CALLB_CPR_SAFE_END(cpr, &tx->tx_sync_lock);
 249 }
 250 
 251 /*
 252  * Stop syncing transaction groups.
 253  */
 254 void
 255 txg_sync_stop(dsl_pool_t *dp)
 256 {
 257         tx_state_t *tx = &dp->dp_tx;
 258 
 259         dprintf("pool %p\n", dp);
 260         /*
 261          * Finish off any work in progress.
 262          */
 263         ASSERT(tx->tx_threads == 2);
 264 
 265         /*
 266          * We need to ensure that we've vacated the deferred space_maps.
 267          */
 268         txg_wait_synced(dp, tx->tx_open_txg + TXG_DEFER_SIZE);
 269 
 270         /*
 271          * Wake all sync threads and wait for them to die.
 272          */
 273         mutex_enter(&tx->tx_sync_lock);
 274 
 275         ASSERT(tx->tx_threads == 2);
 276 
 277         tx->tx_exiting = 1;
 278 
 279         cv_broadcast(&tx->tx_quiesce_more_cv);
 280         cv_broadcast(&tx->tx_quiesce_done_cv);
 281         cv_broadcast(&tx->tx_sync_more_cv);
 282 
 283         while (tx->tx_threads != 0)
 284                 cv_wait(&tx->tx_exit_cv, &tx->tx_sync_lock);
 285 
 286         tx->tx_exiting = 0;
 287 
 288         mutex_exit(&tx->tx_sync_lock);
 289 }
 290 
 291 uint64_t
 292 txg_hold_open(dsl_pool_t *dp, txg_handle_t *th)
 293 {
 294         tx_state_t *tx = &dp->dp_tx;
 295         tx_cpu_t *tc = &tx->tx_cpu[CPU_SEQID];
 296         uint64_t txg;
 297 
 298         mutex_enter(&tc->tc_open_lock);
 299         txg = tx->tx_open_txg;
 300 
 301         mutex_enter(&tc->tc_lock);
 302         tc->tc_count[txg & TXG_MASK]++;
 303         mutex_exit(&tc->tc_lock);
 304 
 305         th->th_cpu = tc;
 306         th->th_txg = txg;
 307 
 308         return (txg);
 309 }
 310 
 311 void
 312 txg_rele_to_quiesce(txg_handle_t *th)
 313 {
 314         tx_cpu_t *tc = th->th_cpu;
 315 
 316         ASSERT(!MUTEX_HELD(&tc->tc_lock));
 317         mutex_exit(&tc->tc_open_lock);
 318 }
 319 
 320 void
 321 txg_register_callbacks(txg_handle_t *th, list_t *tx_callbacks)
 322 {
 323         tx_cpu_t *tc = th->th_cpu;
 324         int g = th->th_txg & TXG_MASK;
 325 
 326         mutex_enter(&tc->tc_lock);
 327         list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
 328         mutex_exit(&tc->tc_lock);
 329 }
 330 
 331 void
 332 txg_rele_to_sync(txg_handle_t *th)
 333 {
 334         tx_cpu_t *tc = th->th_cpu;
 335         int g = th->th_txg & TXG_MASK;
 336 
 337         mutex_enter(&tc->tc_lock);
 338         ASSERT(tc->tc_count[g] != 0);
 339         if (--tc->tc_count[g] == 0)
 340                 cv_broadcast(&tc->tc_cv[g]);
 341         mutex_exit(&tc->tc_lock);
 342 
 343         th->th_cpu = NULL;   /* defensive */
 344 }
 345 
 346 /*
 347  * Blocks until all transactions in the group are committed.
 348  *
 349  * On return, the transaction group has reached a stable state in which it can
 350  * then be passed off to the syncing context.
 351  */
 352 static void
 353 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
 354 {
 355         tx_state_t *tx = &dp->dp_tx;
 356         int g = txg & TXG_MASK;
 357         int c;
 358 
 359         /*
 360          * Grab all tc_open_locks so nobody else can get into this txg.
 361          */
 362         for (c = 0; c < max_ncpus; c++)
 363                 mutex_enter(&tx->tx_cpu[c].tc_open_lock);
 364 
 365         ASSERT(txg == tx->tx_open_txg);
 366         tx->tx_open_txg++;
 367 
 368         DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
 369         DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
 370 
 371         /*
 372          * Now that we've incremented tx_open_txg, we can let threads
 373          * enter the next transaction group.
 374          */
 375         for (c = 0; c < max_ncpus; c++)
 376                 mutex_exit(&tx->tx_cpu[c].tc_open_lock);
 377 
 378         /*
 379          * Quiesce the transaction group by waiting for everyone to txg_exit().
 380          */
 381         for (c = 0; c < max_ncpus; c++) {
 382                 tx_cpu_t *tc = &tx->tx_cpu[c];
 383                 mutex_enter(&tc->tc_lock);
 384                 while (tc->tc_count[g] != 0)
 385                         cv_wait(&tc->tc_cv[g], &tc->tc_lock);
 386                 mutex_exit(&tc->tc_lock);
 387         }
 388 }
 389 
 390 static void
 391 txg_do_callbacks(list_t *cb_list)
 392 {
 393         dmu_tx_do_callbacks(cb_list, 0);
 394 
 395         list_destroy(cb_list);
 396 
 397         kmem_free(cb_list, sizeof (list_t));
 398 }
 399 
 400 /*
 401  * Dispatch the commit callbacks registered on this txg to worker threads.
 402  *
 403  * If no callbacks are registered for a given TXG, nothing happens.
 404  * This function creates a taskq for the associated pool, if needed.
 405  */
 406 static void
 407 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
 408 {
 409         int c;
 410         tx_state_t *tx = &dp->dp_tx;
 411         list_t *cb_list;
 412 
 413         for (c = 0; c < max_ncpus; c++) {
 414                 tx_cpu_t *tc = &tx->tx_cpu[c];
 415                 /*
 416                  * No need to lock tx_cpu_t at this point, since this can
 417                  * only be called once a txg has been synced.
 418                  */
 419 
 420                 int g = txg & TXG_MASK;
 421 
 422                 if (list_is_empty(&tc->tc_callbacks[g]))
 423                         continue;
 424 
 425                 if (tx->tx_commit_cb_taskq == NULL) {
 426                         /*
 427                          * Commit callback taskq hasn't been created yet.
 428                          */
 429                         tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
 430                             max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
 431                             TASKQ_PREPOPULATE);
 432                 }
 433 
 434                 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
 435                 list_create(cb_list, sizeof (dmu_tx_callback_t),
 436                     offsetof(dmu_tx_callback_t, dcb_node));
 437 
 438                 list_move_tail(cb_list, &tc->tc_callbacks[g]);
 439 
 440                 (void) taskq_dispatch(tx->tx_commit_cb_taskq, (task_func_t *)
 441                     txg_do_callbacks, cb_list, TQ_SLEEP);
 442         }
 443 }
 444 
 445 static void
 446 txg_sync_thread(dsl_pool_t *dp)
 447 {
 448         spa_t *spa = dp->dp_spa;
 449         tx_state_t *tx = &dp->dp_tx;
 450         callb_cpr_t cpr;
 451         uint64_t start, delta;
 452 
 453         txg_thread_enter(tx, &cpr);
 454 
 455         start = delta = 0;
 456         for (;;) {
 457                 uint64_t timer, timeout = zfs_txg_timeout * hz;
 458                 uint64_t txg;
 459 
 460                 /*
 461                  * We sync when we're scanning, there's someone waiting
 462                  * on us, or the quiesce thread has handed off a txg to
 463                  * us, or we have reached our timeout.
 464                  */
 465                 timer = (delta >= timeout ? 0 : timeout - delta);
 466                 while (!dsl_scan_active(dp->dp_scan) &&
 467                     !tx->tx_exiting && timer > 0 &&
 468                     tx->tx_synced_txg >= tx->tx_sync_txg_waiting &&
 469                     tx->tx_quiesced_txg == 0) {
 470                         dprintf("waiting; tx_synced=%llu waiting=%llu dp=%p\n",
 471                             tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 472                         txg_thread_wait(tx, &cpr, &tx->tx_sync_more_cv, timer);
 473                         delta = ddi_get_lbolt() - start;
 474                         timer = (delta > timeout ? 0 : timeout - delta);
 475                 }
 476 
 477                 /*
 478                  * Wait until the quiesce thread hands off a txg to us,
 479                  * prompting it to do so if necessary.
 480                  */
 481                 while (!tx->tx_exiting && tx->tx_quiesced_txg == 0) {
 482                         if (tx->tx_quiesce_txg_waiting < tx->tx_open_txg+1)
 483                                 tx->tx_quiesce_txg_waiting = tx->tx_open_txg+1;
 484                         cv_broadcast(&tx->tx_quiesce_more_cv);
 485                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_done_cv, 0);
 486                 }
 487 
 488                 if (tx->tx_exiting)
 489                         txg_thread_exit(tx, &cpr, &tx->tx_sync_thread);
 490 
 491                 /*
 492                  * Consume the quiesced txg which has been handed off to
 493                  * us.  This may cause the quiescing thread to now be
 494                  * able to quiesce another txg, so we must signal it.
 495                  */
 496                 txg = tx->tx_quiesced_txg;
 497                 tx->tx_quiesced_txg = 0;
 498                 tx->tx_syncing_txg = txg;
 499                 DTRACE_PROBE2(txg__syncing, dsl_pool_t *, dp, uint64_t, txg);
 500                 cv_broadcast(&tx->tx_quiesce_more_cv);
 501 
 502                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 503                     txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 504                 mutex_exit(&tx->tx_sync_lock);
 505 
 506                 start = ddi_get_lbolt();
 507                 spa_sync(spa, txg);
 508                 delta = ddi_get_lbolt() - start;
 509 
 510                 mutex_enter(&tx->tx_sync_lock);
 511                 tx->tx_synced_txg = txg;
 512                 tx->tx_syncing_txg = 0;
 513                 DTRACE_PROBE2(txg__synced, dsl_pool_t *, dp, uint64_t, txg);
 514                 cv_broadcast(&tx->tx_sync_done_cv);
 515 
 516                 /*
 517                  * Dispatch commit callbacks to worker threads.
 518                  */
 519                 txg_dispatch_callbacks(dp, txg);
 520         }
 521 }
 522 
 523 static void
 524 txg_quiesce_thread(dsl_pool_t *dp)
 525 {
 526         tx_state_t *tx = &dp->dp_tx;
 527         callb_cpr_t cpr;
 528 
 529         txg_thread_enter(tx, &cpr);
 530 
 531         for (;;) {
 532                 uint64_t txg;
 533 
 534                 /*
 535                  * We quiesce when there's someone waiting on us.
 536                  * However, we can only have one txg in "quiescing" or
 537                  * "quiesced, waiting to sync" state.  So we wait until
 538                  * the "quiesced, waiting to sync" txg has been consumed
 539                  * by the sync thread.
 540                  */
 541                 while (!tx->tx_exiting &&
 542                     (tx->tx_open_txg >= tx->tx_quiesce_txg_waiting ||
 543                     tx->tx_quiesced_txg != 0))
 544                         txg_thread_wait(tx, &cpr, &tx->tx_quiesce_more_cv, 0);
 545 
 546                 if (tx->tx_exiting)
 547                         txg_thread_exit(tx, &cpr, &tx->tx_quiesce_thread);
 548 
 549                 txg = tx->tx_open_txg;
 550                 dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 551                     txg, tx->tx_quiesce_txg_waiting,
 552                     tx->tx_sync_txg_waiting);
 553                 mutex_exit(&tx->tx_sync_lock);
 554                 txg_quiesce(dp, txg);
 555                 mutex_enter(&tx->tx_sync_lock);
 556 
 557                 /*
 558                  * Hand this txg off to the sync thread.
 559                  */
 560                 dprintf("quiesce done, handing off txg %llu\n", txg);
 561                 tx->tx_quiesced_txg = txg;
 562                 DTRACE_PROBE2(txg__quiesced, dsl_pool_t *, dp, uint64_t, txg);
 563                 cv_broadcast(&tx->tx_sync_more_cv);
 564                 cv_broadcast(&tx->tx_quiesce_done_cv);
 565         }
 566 }
 567 
 568 /*
 569  * Delay this thread by delay nanoseconds if we are still in the open
 570  * transaction group and there is already a waiting txg quiescing or quiesced.
 571  * Abort the delay if this txg stalls or enters the quiescing state.
 572  */
 573 void
 574 txg_delay(dsl_pool_t *dp, uint64_t txg, hrtime_t delay, hrtime_t resolution)
 575 {
 576         tx_state_t *tx = &dp->dp_tx;
 577         hrtime_t start = gethrtime();
 578 
 579         /* don't delay if this txg could transition to quiescing immediately */
 580         if (tx->tx_open_txg > txg ||
 581             tx->tx_syncing_txg == txg-1 || tx->tx_synced_txg == txg-1)
 582                 return;
 583 
 584         mutex_enter(&tx->tx_sync_lock);
 585         if (tx->tx_open_txg > txg || tx->tx_synced_txg == txg-1) {
 586                 mutex_exit(&tx->tx_sync_lock);
 587                 return;
 588         }
 589 
 590         while (gethrtime() - start < delay &&
 591             tx->tx_syncing_txg < txg-1 && !txg_stalled(dp)) {
 592                 (void) cv_timedwait_hires(&tx->tx_quiesce_more_cv,
 593                     &tx->tx_sync_lock, delay, resolution, 0);
 594         }
 595 
 596         mutex_exit(&tx->tx_sync_lock);
 597 }
 598 
 599 void
 600 txg_wait_synced(dsl_pool_t *dp, uint64_t txg)
 601 {
 602         tx_state_t *tx = &dp->dp_tx;
 603 
 604         ASSERT(!dsl_pool_config_held(dp));
 605 
 606         mutex_enter(&tx->tx_sync_lock);
 607         ASSERT(tx->tx_threads == 2);
 608         if (txg == 0)
 609                 txg = tx->tx_open_txg + TXG_DEFER_SIZE;
 610         if (tx->tx_sync_txg_waiting < txg)
 611                 tx->tx_sync_txg_waiting = txg;
 612         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 613             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 614         while (tx->tx_synced_txg < txg) {
 615                 dprintf("broadcasting sync more "
 616                     "tx_synced=%llu waiting=%llu dp=%p\n",
 617                     tx->tx_synced_txg, tx->tx_sync_txg_waiting, dp);
 618                 cv_broadcast(&tx->tx_sync_more_cv);
 619                 cv_wait(&tx->tx_sync_done_cv, &tx->tx_sync_lock);
 620         }
 621         mutex_exit(&tx->tx_sync_lock);
 622 }
 623 
 624 void
 625 txg_wait_open(dsl_pool_t *dp, uint64_t txg)
 626 {
 627         tx_state_t *tx = &dp->dp_tx;
 628 
 629         ASSERT(!dsl_pool_config_held(dp));
 630 
 631         mutex_enter(&tx->tx_sync_lock);
 632         ASSERT(tx->tx_threads == 2);
 633         if (txg == 0)
 634                 txg = tx->tx_open_txg + 1;
 635         if (tx->tx_quiesce_txg_waiting < txg)
 636                 tx->tx_quiesce_txg_waiting = txg;
 637         dprintf("txg=%llu quiesce_txg=%llu sync_txg=%llu\n",
 638             txg, tx->tx_quiesce_txg_waiting, tx->tx_sync_txg_waiting);
 639         while (tx->tx_open_txg < txg) {
 640                 cv_broadcast(&tx->tx_quiesce_more_cv);
 641                 cv_wait(&tx->tx_quiesce_done_cv, &tx->tx_sync_lock);
 642         }
 643         mutex_exit(&tx->tx_sync_lock);
 644 }
 645 
 646 boolean_t
 647 txg_stalled(dsl_pool_t *dp)
 648 {
 649         tx_state_t *tx = &dp->dp_tx;
 650         return (tx->tx_quiesce_txg_waiting > tx->tx_open_txg);
 651 }
 652 
 653 boolean_t
 654 txg_sync_waiting(dsl_pool_t *dp)
 655 {
 656         tx_state_t *tx = &dp->dp_tx;
 657 
 658         return (tx->tx_syncing_txg <= tx->tx_sync_txg_waiting ||
 659             tx->tx_quiesced_txg != 0);
 660 }
 661 
 662 /*
 663  * Per-txg object lists.
 664  */
 665 void
 666 txg_list_create(txg_list_t *tl, size_t offset)
 667 {
 668         int t;
 669 
 670         mutex_init(&tl->tl_lock, NULL, MUTEX_DEFAULT, NULL);
 671 
 672         tl->tl_offset = offset;
 673 
 674         for (t = 0; t < TXG_SIZE; t++)
 675                 tl->tl_head[t] = NULL;
 676 }
 677 
 678 void
 679 txg_list_destroy(txg_list_t *tl)
 680 {
 681         int t;
 682 
 683         for (t = 0; t < TXG_SIZE; t++)
 684                 ASSERT(txg_list_empty(tl, t));
 685 
 686         mutex_destroy(&tl->tl_lock);
 687 }
 688 
 689 boolean_t
 690 txg_list_empty(txg_list_t *tl, uint64_t txg)
 691 {
 692         return (tl->tl_head[txg & TXG_MASK] == NULL);
 693 }
 694 
 695 /*
 696  * Add an entry to the list (unless it's already on the list).
 697  * Returns B_TRUE if it was actually added.
 698  */
 699 boolean_t
 700 txg_list_add(txg_list_t *tl, void *p, uint64_t txg)
 701 {
 702         int t = txg & TXG_MASK;
 703         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 704         boolean_t add;
 705 
 706         mutex_enter(&tl->tl_lock);
 707         add = (tn->tn_member[t] == 0);
 708         if (add) {
 709                 tn->tn_member[t] = 1;
 710                 tn->tn_next[t] = tl->tl_head[t];
 711                 tl->tl_head[t] = tn;
 712         }
 713         mutex_exit(&tl->tl_lock);
 714 
 715         return (add);
 716 }
 717 
 718 /*
 719  * Add an entry to the end of the list, unless it's already on the list.
 720  * (walks list to find end)
 721  * Returns B_TRUE if it was actually added.
 722  */
 723 boolean_t
 724 txg_list_add_tail(txg_list_t *tl, void *p, uint64_t txg)
 725 {
 726         int t = txg & TXG_MASK;
 727         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 728         boolean_t add;
 729 
 730         mutex_enter(&tl->tl_lock);
 731         add = (tn->tn_member[t] == 0);
 732         if (add) {
 733                 txg_node_t **tp;
 734 
 735                 for (tp = &tl->tl_head[t]; *tp != NULL; tp = &(*tp)->tn_next[t])
 736                         continue;
 737 
 738                 tn->tn_member[t] = 1;
 739                 tn->tn_next[t] = NULL;
 740                 *tp = tn;
 741         }
 742         mutex_exit(&tl->tl_lock);
 743 
 744         return (add);
 745 }
 746 
 747 /*
 748  * Remove the head of the list and return it.
 749  */
 750 void *
 751 txg_list_remove(txg_list_t *tl, uint64_t txg)
 752 {
 753         int t = txg & TXG_MASK;
 754         txg_node_t *tn;
 755         void *p = NULL;
 756 
 757         mutex_enter(&tl->tl_lock);
 758         if ((tn = tl->tl_head[t]) != NULL) {
 759                 p = (char *)tn - tl->tl_offset;
 760                 tl->tl_head[t] = tn->tn_next[t];
 761                 tn->tn_next[t] = NULL;
 762                 tn->tn_member[t] = 0;
 763         }
 764         mutex_exit(&tl->tl_lock);
 765 
 766         return (p);
 767 }
 768 
 769 /*
 770  * Remove a specific item from the list and return it.
 771  */
 772 void *
 773 txg_list_remove_this(txg_list_t *tl, void *p, uint64_t txg)
 774 {
 775         int t = txg & TXG_MASK;
 776         txg_node_t *tn, **tp;
 777 
 778         mutex_enter(&tl->tl_lock);
 779 
 780         for (tp = &tl->tl_head[t]; (tn = *tp) != NULL; tp = &tn->tn_next[t]) {
 781                 if ((char *)tn - tl->tl_offset == p) {
 782                         *tp = tn->tn_next[t];
 783                         tn->tn_next[t] = NULL;
 784                         tn->tn_member[t] = 0;
 785                         mutex_exit(&tl->tl_lock);
 786                         return (p);
 787                 }
 788         }
 789 
 790         mutex_exit(&tl->tl_lock);
 791 
 792         return (NULL);
 793 }
 794 
 795 boolean_t
 796 txg_list_member(txg_list_t *tl, void *p, uint64_t txg)
 797 {
 798         int t = txg & TXG_MASK;
 799         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 800 
 801         return (tn->tn_member[t] != 0);
 802 }
 803 
 804 /*
 805  * Walk a txg list -- only safe if you know it's not changing.
 806  */
 807 void *
 808 txg_list_head(txg_list_t *tl, uint64_t txg)
 809 {
 810         int t = txg & TXG_MASK;
 811         txg_node_t *tn = tl->tl_head[t];
 812 
 813         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 814 }
 815 
 816 void *
 817 txg_list_next(txg_list_t *tl, void *p, uint64_t txg)
 818 {
 819         int t = txg & TXG_MASK;
 820         txg_node_t *tn = (txg_node_t *)((char *)p + tl->tl_offset);
 821 
 822         tn = tn->tn_next[t];
 823 
 824         return (tn == NULL ? NULL : (char *)tn - tl->tl_offset);
 825 }