320 mutex_enter(&tc->tc_lock);
321 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
322 mutex_exit(&tc->tc_lock);
323 }
324
325 void
326 txg_rele_to_sync(txg_handle_t *th)
327 {
328 tx_cpu_t *tc = th->th_cpu;
329 int g = th->th_txg & TXG_MASK;
330
331 mutex_enter(&tc->tc_lock);
332 ASSERT(tc->tc_count[g] != 0);
333 if (--tc->tc_count[g] == 0)
334 cv_broadcast(&tc->tc_cv[g]);
335 mutex_exit(&tc->tc_lock);
336
337 th->th_cpu = NULL; /* defensive */
338 }
339
340 static void
341 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
342 {
343 tx_state_t *tx = &dp->dp_tx;
344 int g = txg & TXG_MASK;
345 int c;
346
347 /*
348 * Grab all tx_cpu locks so nobody else can get into this txg.
349 */
350 for (c = 0; c < max_ncpus; c++)
351 mutex_enter(&tx->tx_cpu[c].tc_lock);
352
353 ASSERT(txg == tx->tx_open_txg);
354 tx->tx_open_txg++;
355
356 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
357 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
358
359 /*
370 tx_cpu_t *tc = &tx->tx_cpu[c];
371 mutex_enter(&tc->tc_lock);
372 while (tc->tc_count[g] != 0)
373 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
374 mutex_exit(&tc->tc_lock);
375 }
376 }
377
378 static void
379 txg_do_callbacks(list_t *cb_list)
380 {
381 dmu_tx_do_callbacks(cb_list, 0);
382
383 list_destroy(cb_list);
384
385 kmem_free(cb_list, sizeof (list_t));
386 }
387
388 /*
389 * Dispatch the commit callbacks registered on this txg to worker threads.
390 */
391 static void
392 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
393 {
394 int c;
395 tx_state_t *tx = &dp->dp_tx;
396 list_t *cb_list;
397
398 for (c = 0; c < max_ncpus; c++) {
399 tx_cpu_t *tc = &tx->tx_cpu[c];
400 /* No need to lock tx_cpu_t at this point */
401
402 int g = txg & TXG_MASK;
403
404 if (list_is_empty(&tc->tc_callbacks[g]))
405 continue;
406
407 if (tx->tx_commit_cb_taskq == NULL) {
408 /*
409 * Commit callback taskq hasn't been created yet.
410 */
411 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
412 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
413 TASKQ_PREPOPULATE);
414 }
415
416 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
417 list_create(cb_list, sizeof (dmu_tx_callback_t),
418 offsetof(dmu_tx_callback_t, dcb_node));
419
420 list_move_tail(&tc->tc_callbacks[g], cb_list);
|
320 mutex_enter(&tc->tc_lock);
321 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
322 mutex_exit(&tc->tc_lock);
323 }
324
325 void
326 txg_rele_to_sync(txg_handle_t *th)
327 {
328 tx_cpu_t *tc = th->th_cpu;
329 int g = th->th_txg & TXG_MASK;
330
331 mutex_enter(&tc->tc_lock);
332 ASSERT(tc->tc_count[g] != 0);
333 if (--tc->tc_count[g] == 0)
334 cv_broadcast(&tc->tc_cv[g]);
335 mutex_exit(&tc->tc_lock);
336
337 th->th_cpu = NULL; /* defensive */
338 }
339
340 /*
341 * Quiesce, v.: to render temporarily inactive or disabled
342 *
343 * Blocks until all transactions in the group are committed.
344 *
345 * On return, the transaction group has reached a stable state in which it can
346 * then be passed off to the syncing context.
347 */
348 static void
349 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
350 {
351 tx_state_t *tx = &dp->dp_tx;
352 int g = txg & TXG_MASK;
353 int c;
354
355 /*
356 * Grab all tx_cpu locks so nobody else can get into this txg.
357 */
358 for (c = 0; c < max_ncpus; c++)
359 mutex_enter(&tx->tx_cpu[c].tc_lock);
360
361 ASSERT(txg == tx->tx_open_txg);
362 tx->tx_open_txg++;
363
364 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
365 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
366
367 /*
378 tx_cpu_t *tc = &tx->tx_cpu[c];
379 mutex_enter(&tc->tc_lock);
380 while (tc->tc_count[g] != 0)
381 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
382 mutex_exit(&tc->tc_lock);
383 }
384 }
385
386 static void
387 txg_do_callbacks(list_t *cb_list)
388 {
389 dmu_tx_do_callbacks(cb_list, 0);
390
391 list_destroy(cb_list);
392
393 kmem_free(cb_list, sizeof (list_t));
394 }
395
396 /*
397 * Dispatch the commit callbacks registered on this txg to worker threads.
398 *
399 * If no callbacks are registered for a given TXG, nothing happens.
400 * This function creates a taskq for the associated pool, if needed.
401 */
402 static void
403 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
404 {
405 int c;
406 tx_state_t *tx = &dp->dp_tx;
407 list_t *cb_list;
408
409 for (c = 0; c < max_ncpus; c++) {
410 tx_cpu_t *tc = &tx->tx_cpu[c];
411 /*
412 * No need to lock tx_cpu_t at this point, since this can
413 * only be called once a txg has been synced.
414 */
415
416 int g = txg & TXG_MASK;
417
418 if (list_is_empty(&tc->tc_callbacks[g]))
419 continue;
420
421 if (tx->tx_commit_cb_taskq == NULL) {
422 /*
423 * Commit callback taskq hasn't been created yet.
424 */
425 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
426 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
427 TASKQ_PREPOPULATE);
428 }
429
430 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
431 list_create(cb_list, sizeof (dmu_tx_callback_t),
432 offsetof(dmu_tx_callback_t, dcb_node));
433
434 list_move_tail(&tc->tc_callbacks[g], cb_list);
|