320 mutex_enter(&tc->tc_lock);
321 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
322 mutex_exit(&tc->tc_lock);
323 }
324
325 void
326 txg_rele_to_sync(txg_handle_t *th)
327 {
328 tx_cpu_t *tc = th->th_cpu;
329 int g = th->th_txg & TXG_MASK;
330
331 mutex_enter(&tc->tc_lock);
332 ASSERT(tc->tc_count[g] != 0);
333 if (--tc->tc_count[g] == 0)
334 cv_broadcast(&tc->tc_cv[g]);
335 mutex_exit(&tc->tc_lock);
336
337 th->th_cpu = NULL; /* defensive */
338 }
339
340 static void
341 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
342 {
343 tx_state_t *tx = &dp->dp_tx;
344 int g = txg & TXG_MASK;
345 int c;
346
347 /*
348 * Grab all tx_cpu locks so nobody else can get into this txg.
349 */
350 for (c = 0; c < max_ncpus; c++)
351 mutex_enter(&tx->tx_cpu[c].tc_lock);
352
353 ASSERT(txg == tx->tx_open_txg);
354 tx->tx_open_txg++;
355
356 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
357 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
358
359 /*
370 tx_cpu_t *tc = &tx->tx_cpu[c];
371 mutex_enter(&tc->tc_lock);
372 while (tc->tc_count[g] != 0)
373 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
374 mutex_exit(&tc->tc_lock);
375 }
376 }
377
378 static void
379 txg_do_callbacks(list_t *cb_list)
380 {
381 dmu_tx_do_callbacks(cb_list, 0);
382
383 list_destroy(cb_list);
384
385 kmem_free(cb_list, sizeof (list_t));
386 }
387
388 /*
389 * Dispatch the commit callbacks registered on this txg to worker threads.
390 */
391 static void
392 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
393 {
394 int c;
395 tx_state_t *tx = &dp->dp_tx;
396 list_t *cb_list;
397
398 for (c = 0; c < max_ncpus; c++) {
399 tx_cpu_t *tc = &tx->tx_cpu[c];
400 /* No need to lock tx_cpu_t at this point */
401
402 int g = txg & TXG_MASK;
403
404 if (list_is_empty(&tc->tc_callbacks[g]))
405 continue;
406
407 if (tx->tx_commit_cb_taskq == NULL) {
408 /*
409 * Commit callback taskq hasn't been created yet.
410 */
411 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
412 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
413 TASKQ_PREPOPULATE);
414 }
415
416 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
417 list_create(cb_list, sizeof (dmu_tx_callback_t),
418 offsetof(dmu_tx_callback_t, dcb_node));
419
420 list_move_tail(&tc->tc_callbacks[g], cb_list);
|
320 mutex_enter(&tc->tc_lock);
321 list_move_tail(&tc->tc_callbacks[g], tx_callbacks);
322 mutex_exit(&tc->tc_lock);
323 }
324
325 void
326 txg_rele_to_sync(txg_handle_t *th)
327 {
328 tx_cpu_t *tc = th->th_cpu;
329 int g = th->th_txg & TXG_MASK;
330
331 mutex_enter(&tc->tc_lock);
332 ASSERT(tc->tc_count[g] != 0);
333 if (--tc->tc_count[g] == 0)
334 cv_broadcast(&tc->tc_cv[g]);
335 mutex_exit(&tc->tc_lock);
336
337 th->th_cpu = NULL; /* defensive */
338 }
339
340 /*
341 * Blocks until all transactions in the group are committed.
342 *
343 * On return, the transaction group has reached a stable state in which it can
344 * then be passed off to the syncing context.
345 */
346 static void
347 txg_quiesce(dsl_pool_t *dp, uint64_t txg)
348 {
349 tx_state_t *tx = &dp->dp_tx;
350 int g = txg & TXG_MASK;
351 int c;
352
353 /*
354 * Grab all tx_cpu locks so nobody else can get into this txg.
355 */
356 for (c = 0; c < max_ncpus; c++)
357 mutex_enter(&tx->tx_cpu[c].tc_lock);
358
359 ASSERT(txg == tx->tx_open_txg);
360 tx->tx_open_txg++;
361
362 DTRACE_PROBE2(txg__quiescing, dsl_pool_t *, dp, uint64_t, txg);
363 DTRACE_PROBE2(txg__opened, dsl_pool_t *, dp, uint64_t, tx->tx_open_txg);
364
365 /*
376 tx_cpu_t *tc = &tx->tx_cpu[c];
377 mutex_enter(&tc->tc_lock);
378 while (tc->tc_count[g] != 0)
379 cv_wait(&tc->tc_cv[g], &tc->tc_lock);
380 mutex_exit(&tc->tc_lock);
381 }
382 }
383
384 static void
385 txg_do_callbacks(list_t *cb_list)
386 {
387 dmu_tx_do_callbacks(cb_list, 0);
388
389 list_destroy(cb_list);
390
391 kmem_free(cb_list, sizeof (list_t));
392 }
393
394 /*
395 * Dispatch the commit callbacks registered on this txg to worker threads.
396 *
397 * If no callbacks are registered for a given TXG, nothing happens.
398 * This function creates a taskq for the associated pool, if needed.
399 */
400 static void
401 txg_dispatch_callbacks(dsl_pool_t *dp, uint64_t txg)
402 {
403 int c;
404 tx_state_t *tx = &dp->dp_tx;
405 list_t *cb_list;
406
407 for (c = 0; c < max_ncpus; c++) {
408 tx_cpu_t *tc = &tx->tx_cpu[c];
409 /*
410 * No need to lock tx_cpu_t at this point, since this can
411 * only be called once a txg has been synced.
412 */
413
414 int g = txg & TXG_MASK;
415
416 if (list_is_empty(&tc->tc_callbacks[g]))
417 continue;
418
419 if (tx->tx_commit_cb_taskq == NULL) {
420 /*
421 * Commit callback taskq hasn't been created yet.
422 */
423 tx->tx_commit_cb_taskq = taskq_create("tx_commit_cb",
424 max_ncpus, minclsyspri, max_ncpus, max_ncpus * 2,
425 TASKQ_PREPOPULATE);
426 }
427
428 cb_list = kmem_alloc(sizeof (list_t), KM_SLEEP);
429 list_create(cb_list, sizeof (dmu_tx_callback_t),
430 offsetof(dmu_tx_callback_t, dcb_node));
431
432 list_move_tail(&tc->tc_callbacks[g], cb_list);
|