Print this page
    
3006 VERIFY[S,U,P] and ASSERT[S,U,P] frequently check if first argument is zero
    
      
        | Split | 
	Close | 
      
      | Expand all | 
      | Collapse all | 
    
    
          --- old/usr/src/uts/common/fs/zfs/zil.c
          +++ new/usr/src/uts/common/fs/zfs/zil.c
   1    1  /*
   2    2   * CDDL HEADER START
   3    3   *
   4    4   * The contents of this file are subject to the terms of the
   5    5   * Common Development and Distribution License (the "License").
   6    6   * You may not use this file except in compliance with the License.
   7    7   *
   8    8   * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9    9   * or http://www.opensolaris.org/os/licensing.
  10   10   * See the License for the specific language governing permissions
  11   11   * and limitations under the License.
  12   12   *
  
    | 
      ↓ open down ↓ | 
    12 lines elided | 
    
      ↑ open up ↑ | 
  
  13   13   * When distributing Covered Code, include this CDDL HEADER in each
  14   14   * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15   15   * If applicable, add the following below this CDDL HEADER, with the
  16   16   * fields enclosed by brackets "[]" replaced with your own identifying
  17   17   * information: Portions Copyright [yyyy] [name of copyright owner]
  18   18   *
  19   19   * CDDL HEADER END
  20   20   */
  21   21  /*
  22   22   * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  23      - * Copyright (c) 2011 by Delphix. All rights reserved.
       23 + * Copyright (c) 2012 by Delphix. All rights reserved.
  24   24   */
  25   25  
  26   26  /* Portions Copyright 2010 Robert Milkowski */
  27   27  
  28   28  #include <sys/zfs_context.h>
  29   29  #include <sys/spa.h>
  30   30  #include <sys/dmu.h>
  31   31  #include <sys/zap.h>
  32   32  #include <sys/arc.h>
  33   33  #include <sys/stat.h>
  34   34  #include <sys/resource.h>
  35   35  #include <sys/zil.h>
  36   36  #include <sys/zil_impl.h>
  37   37  #include <sys/dsl_dataset.h>
  38   38  #include <sys/vdev_impl.h>
  39   39  #include <sys/dmu_tx.h>
  40   40  #include <sys/dsl_pool.h>
  41   41  
  42   42  /*
  43   43   * The zfs intent log (ZIL) saves transaction records of system calls
  44   44   * that change the file system in memory with enough information
  45   45   * to be able to replay them. These are stored in memory until
  46   46   * either the DMU transaction group (txg) commits them to the stable pool
  47   47   * and they can be discarded, or they are flushed to the stable log
  48   48   * (also in the pool) due to a fsync, O_DSYNC or other synchronous
  49   49   * requirement. In the event of a panic or power fail then those log
  50   50   * records (transactions) are replayed.
  51   51   *
  52   52   * There is one ZIL per file system. Its on-disk (pool) format consists
  53   53   * of 3 parts:
  54   54   *
  55   55   *      - ZIL header
  56   56   *      - ZIL blocks
  57   57   *      - ZIL records
  58   58   *
  59   59   * A log record holds a system call transaction. Log blocks can
  60   60   * hold many log records and the blocks are chained together.
  61   61   * Each ZIL block contains a block pointer (blkptr_t) to the next
  62   62   * ZIL block in the chain. The ZIL header points to the first
  63   63   * block in the chain. Note there is not a fixed place in the pool
  64   64   * to hold blocks. They are dynamically allocated and freed as
  65   65   * needed from the blocks available. Figure X shows the ZIL structure:
  66   66   */
  67   67  
  68   68  /*
  69   69   * This global ZIL switch affects all pools
  70   70   */
  71   71  int zil_replay_disable = 0;    /* disable intent logging replay */
  72   72  
  73   73  /*
  74   74   * Tunable parameter for debugging or performance analysis.  Setting
  75   75   * zfs_nocacheflush will cause corruption on power loss if a volatile
  76   76   * out-of-order write cache is enabled.
  77   77   */
  78   78  boolean_t zfs_nocacheflush = B_FALSE;
  79   79  
  80   80  static kmem_cache_t *zil_lwb_cache;
  81   81  
  82   82  static void zil_async_to_sync(zilog_t *zilog, uint64_t foid);
  83   83  
  84   84  #define LWB_EMPTY(lwb) ((BP_GET_LSIZE(&lwb->lwb_blk) - \
  85   85      sizeof (zil_chain_t)) == (lwb->lwb_sz - lwb->lwb_nused))
  86   86  
  87   87  
  88   88  /*
  89   89   * ziltest is by and large an ugly hack, but very useful in
  90   90   * checking replay without tedious work.
  91   91   * When running ziltest we want to keep all itx's and so maintain
  92   92   * a single list in the zl_itxg[] that uses a high txg: ZILTEST_TXG
  93   93   * We subtract TXG_CONCURRENT_STATES to allow for common code.
  94   94   */
  95   95  #define ZILTEST_TXG (UINT64_MAX - TXG_CONCURRENT_STATES)
  96   96  
  97   97  static int
  98   98  zil_bp_compare(const void *x1, const void *x2)
  99   99  {
 100  100          const dva_t *dva1 = &((zil_bp_node_t *)x1)->zn_dva;
 101  101          const dva_t *dva2 = &((zil_bp_node_t *)x2)->zn_dva;
 102  102  
 103  103          if (DVA_GET_VDEV(dva1) < DVA_GET_VDEV(dva2))
 104  104                  return (-1);
 105  105          if (DVA_GET_VDEV(dva1) > DVA_GET_VDEV(dva2))
 106  106                  return (1);
 107  107  
 108  108          if (DVA_GET_OFFSET(dva1) < DVA_GET_OFFSET(dva2))
 109  109                  return (-1);
 110  110          if (DVA_GET_OFFSET(dva1) > DVA_GET_OFFSET(dva2))
 111  111                  return (1);
 112  112  
 113  113          return (0);
 114  114  }
 115  115  
 116  116  static void
 117  117  zil_bp_tree_init(zilog_t *zilog)
 118  118  {
 119  119          avl_create(&zilog->zl_bp_tree, zil_bp_compare,
 120  120              sizeof (zil_bp_node_t), offsetof(zil_bp_node_t, zn_node));
 121  121  }
 122  122  
 123  123  static void
 124  124  zil_bp_tree_fini(zilog_t *zilog)
 125  125  {
 126  126          avl_tree_t *t = &zilog->zl_bp_tree;
 127  127          zil_bp_node_t *zn;
 128  128          void *cookie = NULL;
 129  129  
 130  130          while ((zn = avl_destroy_nodes(t, &cookie)) != NULL)
 131  131                  kmem_free(zn, sizeof (zil_bp_node_t));
 132  132  
 133  133          avl_destroy(t);
 134  134  }
 135  135  
 136  136  int
 137  137  zil_bp_tree_add(zilog_t *zilog, const blkptr_t *bp)
 138  138  {
 139  139          avl_tree_t *t = &zilog->zl_bp_tree;
 140  140          const dva_t *dva = BP_IDENTITY(bp);
 141  141          zil_bp_node_t *zn;
 142  142          avl_index_t where;
 143  143  
 144  144          if (avl_find(t, dva, &where) != NULL)
 145  145                  return (EEXIST);
 146  146  
 147  147          zn = kmem_alloc(sizeof (zil_bp_node_t), KM_SLEEP);
 148  148          zn->zn_dva = *dva;
 149  149          avl_insert(t, zn, where);
 150  150  
 151  151          return (0);
 152  152  }
 153  153  
 154  154  static zil_header_t *
 155  155  zil_header_in_syncing_context(zilog_t *zilog)
 156  156  {
 157  157          return ((zil_header_t *)zilog->zl_header);
 158  158  }
 159  159  
 160  160  static void
 161  161  zil_init_log_chain(zilog_t *zilog, blkptr_t *bp)
 162  162  {
 163  163          zio_cksum_t *zc = &bp->blk_cksum;
 164  164  
 165  165          zc->zc_word[ZIL_ZC_GUID_0] = spa_get_random(-1ULL);
 166  166          zc->zc_word[ZIL_ZC_GUID_1] = spa_get_random(-1ULL);
 167  167          zc->zc_word[ZIL_ZC_OBJSET] = dmu_objset_id(zilog->zl_os);
 168  168          zc->zc_word[ZIL_ZC_SEQ] = 1ULL;
 169  169  }
 170  170  
 171  171  /*
 172  172   * Read a log block and make sure it's valid.
 173  173   */
 174  174  static int
 175  175  zil_read_log_block(zilog_t *zilog, const blkptr_t *bp, blkptr_t *nbp, void *dst,
 176  176      char **end)
 177  177  {
 178  178          enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
 179  179          uint32_t aflags = ARC_WAIT;
 180  180          arc_buf_t *abuf = NULL;
 181  181          zbookmark_t zb;
 182  182          int error;
 183  183  
 184  184          if (zilog->zl_header->zh_claim_txg == 0)
 185  185                  zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 186  186  
 187  187          if (!(zilog->zl_header->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
 188  188                  zio_flags |= ZIO_FLAG_SPECULATIVE;
 189  189  
 190  190          SET_BOOKMARK(&zb, bp->blk_cksum.zc_word[ZIL_ZC_OBJSET],
 191  191              ZB_ZIL_OBJECT, ZB_ZIL_LEVEL, bp->blk_cksum.zc_word[ZIL_ZC_SEQ]);
 192  192  
 193  193          error = dsl_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
 194  194              ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
 195  195  
 196  196          if (error == 0) {
 197  197                  zio_cksum_t cksum = bp->blk_cksum;
 198  198  
 199  199                  /*
 200  200                   * Validate the checksummed log block.
 201  201                   *
 202  202                   * Sequence numbers should be... sequential.  The checksum
 203  203                   * verifier for the next block should be bp's checksum plus 1.
 204  204                   *
 205  205                   * Also check the log chain linkage and size used.
 206  206                   */
 207  207                  cksum.zc_word[ZIL_ZC_SEQ]++;
 208  208  
 209  209                  if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
 210  210                          zil_chain_t *zilc = abuf->b_data;
 211  211                          char *lr = (char *)(zilc + 1);
 212  212                          uint64_t len = zilc->zc_nused - sizeof (zil_chain_t);
 213  213  
 214  214                          if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
 215  215                              sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk)) {
 216  216                                  error = ECKSUM;
 217  217                          } else {
 218  218                                  bcopy(lr, dst, len);
 219  219                                  *end = (char *)dst + len;
 220  220                                  *nbp = zilc->zc_next_blk;
 221  221                          }
 222  222                  } else {
 223  223                          char *lr = abuf->b_data;
 224  224                          uint64_t size = BP_GET_LSIZE(bp);
 225  225                          zil_chain_t *zilc = (zil_chain_t *)(lr + size) - 1;
 226  226  
 227  227                          if (bcmp(&cksum, &zilc->zc_next_blk.blk_cksum,
 228  228                              sizeof (cksum)) || BP_IS_HOLE(&zilc->zc_next_blk) ||
 229  229                              (zilc->zc_nused > (size - sizeof (*zilc)))) {
 230  230                                  error = ECKSUM;
 231  231                          } else {
 232  232                                  bcopy(lr, dst, zilc->zc_nused);
 233  233                                  *end = (char *)dst + zilc->zc_nused;
 234  234                                  *nbp = zilc->zc_next_blk;
 235  235                          }
 236  236                  }
 237  237  
 238  238                  VERIFY(arc_buf_remove_ref(abuf, &abuf) == 1);
 239  239          }
 240  240  
 241  241          return (error);
 242  242  }
 243  243  
 244  244  /*
 245  245   * Read a TX_WRITE log data block.
 246  246   */
 247  247  static int
 248  248  zil_read_log_data(zilog_t *zilog, const lr_write_t *lr, void *wbuf)
 249  249  {
 250  250          enum zio_flag zio_flags = ZIO_FLAG_CANFAIL;
 251  251          const blkptr_t *bp = &lr->lr_blkptr;
 252  252          uint32_t aflags = ARC_WAIT;
 253  253          arc_buf_t *abuf = NULL;
 254  254          zbookmark_t zb;
 255  255          int error;
 256  256  
 257  257          if (BP_IS_HOLE(bp)) {
 258  258                  if (wbuf != NULL)
 259  259                          bzero(wbuf, MAX(BP_GET_LSIZE(bp), lr->lr_length));
 260  260                  return (0);
 261  261          }
 262  262  
 263  263          if (zilog->zl_header->zh_claim_txg == 0)
 264  264                  zio_flags |= ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB;
 265  265  
 266  266          SET_BOOKMARK(&zb, dmu_objset_id(zilog->zl_os), lr->lr_foid,
 267  267              ZB_ZIL_LEVEL, lr->lr_offset / BP_GET_LSIZE(bp));
 268  268  
 269  269          error = arc_read_nolock(NULL, zilog->zl_spa, bp, arc_getbuf_func, &abuf,
 270  270              ZIO_PRIORITY_SYNC_READ, zio_flags, &aflags, &zb);
 271  271  
 272  272          if (error == 0) {
 273  273                  if (wbuf != NULL)
 274  274                          bcopy(abuf->b_data, wbuf, arc_buf_size(abuf));
 275  275                  (void) arc_buf_remove_ref(abuf, &abuf);
 276  276          }
 277  277  
 278  278          return (error);
 279  279  }
 280  280  
 281  281  /*
 282  282   * Parse the intent log, and call parse_func for each valid record within.
 283  283   */
 284  284  int
 285  285  zil_parse(zilog_t *zilog, zil_parse_blk_func_t *parse_blk_func,
 286  286      zil_parse_lr_func_t *parse_lr_func, void *arg, uint64_t txg)
 287  287  {
 288  288          const zil_header_t *zh = zilog->zl_header;
 289  289          boolean_t claimed = !!zh->zh_claim_txg;
 290  290          uint64_t claim_blk_seq = claimed ? zh->zh_claim_blk_seq : UINT64_MAX;
 291  291          uint64_t claim_lr_seq = claimed ? zh->zh_claim_lr_seq : UINT64_MAX;
 292  292          uint64_t max_blk_seq = 0;
 293  293          uint64_t max_lr_seq = 0;
 294  294          uint64_t blk_count = 0;
 295  295          uint64_t lr_count = 0;
 296  296          blkptr_t blk, next_blk;
 297  297          char *lrbuf, *lrp;
 298  298          int error = 0;
 299  299  
 300  300          /*
 301  301           * Old logs didn't record the maximum zh_claim_lr_seq.
 302  302           */
 303  303          if (!(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID))
 304  304                  claim_lr_seq = UINT64_MAX;
 305  305  
 306  306          /*
 307  307           * Starting at the block pointed to by zh_log we read the log chain.
 308  308           * For each block in the chain we strongly check that block to
 309  309           * ensure its validity.  We stop when an invalid block is found.
 310  310           * For each block pointer in the chain we call parse_blk_func().
 311  311           * For each record in each valid block we call parse_lr_func().
 312  312           * If the log has been claimed, stop if we encounter a sequence
 313  313           * number greater than the highest claimed sequence number.
 314  314           */
 315  315          lrbuf = zio_buf_alloc(SPA_MAXBLOCKSIZE);
 316  316          zil_bp_tree_init(zilog);
 317  317  
 318  318          for (blk = zh->zh_log; !BP_IS_HOLE(&blk); blk = next_blk) {
 319  319                  uint64_t blk_seq = blk.blk_cksum.zc_word[ZIL_ZC_SEQ];
 320  320                  int reclen;
 321  321                  char *end;
 322  322  
 323  323                  if (blk_seq > claim_blk_seq)
 324  324                          break;
 325  325                  if ((error = parse_blk_func(zilog, &blk, arg, txg)) != 0)
 326  326                          break;
 327  327                  ASSERT3U(max_blk_seq, <, blk_seq);
 328  328                  max_blk_seq = blk_seq;
 329  329                  blk_count++;
 330  330  
 331  331                  if (max_lr_seq == claim_lr_seq && max_blk_seq == claim_blk_seq)
 332  332                          break;
 333  333  
 334  334                  error = zil_read_log_block(zilog, &blk, &next_blk, lrbuf, &end);
 335  335                  if (error)
 336  336                          break;
 337  337  
 338  338                  for (lrp = lrbuf; lrp < end; lrp += reclen) {
 339  339                          lr_t *lr = (lr_t *)lrp;
 340  340                          reclen = lr->lrc_reclen;
 341  341                          ASSERT3U(reclen, >=, sizeof (lr_t));
 342  342                          if (lr->lrc_seq > claim_lr_seq)
 343  343                                  goto done;
 344  344                          if ((error = parse_lr_func(zilog, lr, arg, txg)) != 0)
 345  345                                  goto done;
 346  346                          ASSERT3U(max_lr_seq, <, lr->lrc_seq);
 347  347                          max_lr_seq = lr->lrc_seq;
 348  348                          lr_count++;
 349  349                  }
 350  350          }
 351  351  done:
 352  352          zilog->zl_parse_error = error;
 353  353          zilog->zl_parse_blk_seq = max_blk_seq;
 354  354          zilog->zl_parse_lr_seq = max_lr_seq;
 355  355          zilog->zl_parse_blk_count = blk_count;
 356  356          zilog->zl_parse_lr_count = lr_count;
 357  357  
 358  358          ASSERT(!claimed || !(zh->zh_flags & ZIL_CLAIM_LR_SEQ_VALID) ||
 359  359              (max_blk_seq == claim_blk_seq && max_lr_seq == claim_lr_seq));
 360  360  
 361  361          zil_bp_tree_fini(zilog);
 362  362          zio_buf_free(lrbuf, SPA_MAXBLOCKSIZE);
 363  363  
 364  364          return (error);
 365  365  }
 366  366  
 367  367  static int
 368  368  zil_claim_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t first_txg)
 369  369  {
 370  370          /*
 371  371           * Claim log block if not already committed and not already claimed.
 372  372           * If tx == NULL, just verify that the block is claimable.
 373  373           */
 374  374          if (bp->blk_birth < first_txg || zil_bp_tree_add(zilog, bp) != 0)
 375  375                  return (0);
 376  376  
 377  377          return (zio_wait(zio_claim(NULL, zilog->zl_spa,
 378  378              tx == NULL ? 0 : first_txg, bp, spa_claim_notify, NULL,
 379  379              ZIO_FLAG_CANFAIL | ZIO_FLAG_SPECULATIVE | ZIO_FLAG_SCRUB)));
 380  380  }
 381  381  
 382  382  static int
 383  383  zil_claim_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t first_txg)
 384  384  {
 385  385          lr_write_t *lr = (lr_write_t *)lrc;
 386  386          int error;
 387  387  
 388  388          if (lrc->lrc_txtype != TX_WRITE)
 389  389                  return (0);
 390  390  
 391  391          /*
 392  392           * If the block is not readable, don't claim it.  This can happen
 393  393           * in normal operation when a log block is written to disk before
 394  394           * some of the dmu_sync() blocks it points to.  In this case, the
 395  395           * transaction cannot have been committed to anyone (we would have
 396  396           * waited for all writes to be stable first), so it is semantically
 397  397           * correct to declare this the end of the log.
 398  398           */
 399  399          if (lr->lr_blkptr.blk_birth >= first_txg &&
 400  400              (error = zil_read_log_data(zilog, lr, NULL)) != 0)
 401  401                  return (error);
 402  402          return (zil_claim_log_block(zilog, &lr->lr_blkptr, tx, first_txg));
 403  403  }
 404  404  
 405  405  /* ARGSUSED */
 406  406  static int
 407  407  zil_free_log_block(zilog_t *zilog, blkptr_t *bp, void *tx, uint64_t claim_txg)
 408  408  {
 409  409          zio_free_zil(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 410  410  
 411  411          return (0);
 412  412  }
 413  413  
 414  414  static int
 415  415  zil_free_log_record(zilog_t *zilog, lr_t *lrc, void *tx, uint64_t claim_txg)
 416  416  {
 417  417          lr_write_t *lr = (lr_write_t *)lrc;
 418  418          blkptr_t *bp = &lr->lr_blkptr;
 419  419  
 420  420          /*
 421  421           * If we previously claimed it, we need to free it.
 422  422           */
 423  423          if (claim_txg != 0 && lrc->lrc_txtype == TX_WRITE &&
 424  424              bp->blk_birth >= claim_txg && zil_bp_tree_add(zilog, bp) == 0)
 425  425                  zio_free(zilog->zl_spa, dmu_tx_get_txg(tx), bp);
 426  426  
 427  427          return (0);
 428  428  }
 429  429  
 430  430  static lwb_t *
 431  431  zil_alloc_lwb(zilog_t *zilog, blkptr_t *bp, uint64_t txg)
 432  432  {
 433  433          lwb_t *lwb;
 434  434  
 435  435          lwb = kmem_cache_alloc(zil_lwb_cache, KM_SLEEP);
 436  436          lwb->lwb_zilog = zilog;
 437  437          lwb->lwb_blk = *bp;
 438  438          lwb->lwb_buf = zio_buf_alloc(BP_GET_LSIZE(bp));
 439  439          lwb->lwb_max_txg = txg;
 440  440          lwb->lwb_zio = NULL;
 441  441          lwb->lwb_tx = NULL;
 442  442          if (BP_GET_CHECKSUM(bp) == ZIO_CHECKSUM_ZILOG2) {
 443  443                  lwb->lwb_nused = sizeof (zil_chain_t);
 444  444                  lwb->lwb_sz = BP_GET_LSIZE(bp);
 445  445          } else {
 446  446                  lwb->lwb_nused = 0;
 447  447                  lwb->lwb_sz = BP_GET_LSIZE(bp) - sizeof (zil_chain_t);
 448  448          }
 449  449  
 450  450          mutex_enter(&zilog->zl_lock);
 451  451          list_insert_tail(&zilog->zl_lwb_list, lwb);
 452  452          mutex_exit(&zilog->zl_lock);
 453  453  
 454  454          return (lwb);
 455  455  }
 456  456  
 457  457  /*
 458  458   * Create an on-disk intent log.
 459  459   */
 460  460  static lwb_t *
 461  461  zil_create(zilog_t *zilog)
 462  462  {
 463  463          const zil_header_t *zh = zilog->zl_header;
 464  464          lwb_t *lwb = NULL;
 465  465          uint64_t txg = 0;
 466  466          dmu_tx_t *tx = NULL;
 467  467          blkptr_t blk;
 468  468          int error = 0;
 469  469  
 470  470          /*
 471  471           * Wait for any previous destroy to complete.
 472  472           */
 473  473          txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
 474  474  
 475  475          ASSERT(zh->zh_claim_txg == 0);
 476  476          ASSERT(zh->zh_replay_seq == 0);
 477  477  
 478  478          blk = zh->zh_log;
 479  479  
 480  480          /*
 481  481           * Allocate an initial log block if:
 482  482           *    - there isn't one already
 483  483           *    - the existing block is the wrong endianess
 484  484           */
 485  485          if (BP_IS_HOLE(&blk) || BP_SHOULD_BYTESWAP(&blk)) {
 486  486                  tx = dmu_tx_create(zilog->zl_os);
 487  487                  VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
 488  488                  dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 489  489                  txg = dmu_tx_get_txg(tx);
 490  490  
 491  491                  if (!BP_IS_HOLE(&blk)) {
 492  492                          zio_free_zil(zilog->zl_spa, txg, &blk);
 493  493                          BP_ZERO(&blk);
 494  494                  }
 495  495  
 496  496                  error = zio_alloc_zil(zilog->zl_spa, txg, &blk, NULL,
 497  497                      ZIL_MIN_BLKSZ, zilog->zl_logbias == ZFS_LOGBIAS_LATENCY);
 498  498  
 499  499                  if (error == 0)
 500  500                          zil_init_log_chain(zilog, &blk);
 501  501          }
 502  502  
 503  503          /*
 504  504           * Allocate a log write buffer (lwb) for the first log block.
 505  505           */
 506  506          if (error == 0)
 507  507                  lwb = zil_alloc_lwb(zilog, &blk, txg);
 508  508  
 509  509          /*
 510  510           * If we just allocated the first log block, commit our transaction
 511  511           * and wait for zil_sync() to stuff the block poiner into zh_log.
 512  512           * (zh is part of the MOS, so we cannot modify it in open context.)
 513  513           */
 514  514          if (tx != NULL) {
 515  515                  dmu_tx_commit(tx);
 516  516                  txg_wait_synced(zilog->zl_dmu_pool, txg);
 517  517          }
 518  518  
 519  519          ASSERT(bcmp(&blk, &zh->zh_log, sizeof (blk)) == 0);
 520  520  
 521  521          return (lwb);
 522  522  }
 523  523  
 524  524  /*
 525  525   * In one tx, free all log blocks and clear the log header.
 526  526   * If keep_first is set, then we're replaying a log with no content.
 527  527   * We want to keep the first block, however, so that the first
 528  528   * synchronous transaction doesn't require a txg_wait_synced()
 529  529   * in zil_create().  We don't need to txg_wait_synced() here either
 530  530   * when keep_first is set, because both zil_create() and zil_destroy()
 531  531   * will wait for any in-progress destroys to complete.
 532  532   */
 533  533  void
 534  534  zil_destroy(zilog_t *zilog, boolean_t keep_first)
 535  535  {
 536  536          const zil_header_t *zh = zilog->zl_header;
 537  537          lwb_t *lwb;
 538  538          dmu_tx_t *tx;
 539  539          uint64_t txg;
 540  540  
 541  541          /*
 542  542           * Wait for any previous destroy to complete.
 543  543           */
 544  544          txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
 545  545  
 546  546          zilog->zl_old_header = *zh;             /* debugging aid */
 547  547  
 548  548          if (BP_IS_HOLE(&zh->zh_log))
 549  549                  return;
 550  550  
 551  551          tx = dmu_tx_create(zilog->zl_os);
 552  552          VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
 553  553          dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 554  554          txg = dmu_tx_get_txg(tx);
 555  555  
 556  556          mutex_enter(&zilog->zl_lock);
 557  557  
 558  558          ASSERT3U(zilog->zl_destroy_txg, <, txg);
 559  559          zilog->zl_destroy_txg = txg;
 560  560          zilog->zl_keep_first = keep_first;
 561  561  
 562  562          if (!list_is_empty(&zilog->zl_lwb_list)) {
 563  563                  ASSERT(zh->zh_claim_txg == 0);
 564  564                  VERIFY(!keep_first);
 565  565                  while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
 566  566                          list_remove(&zilog->zl_lwb_list, lwb);
 567  567                          if (lwb->lwb_buf != NULL)
 568  568                                  zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
 569  569                          zio_free_zil(zilog->zl_spa, txg, &lwb->lwb_blk);
 570  570                          kmem_cache_free(zil_lwb_cache, lwb);
 571  571                  }
 572  572          } else if (!keep_first) {
 573  573                  (void) zil_parse(zilog, zil_free_log_block,
 574  574                      zil_free_log_record, tx, zh->zh_claim_txg);
 575  575          }
 576  576          mutex_exit(&zilog->zl_lock);
 577  577  
 578  578          dmu_tx_commit(tx);
 579  579  }
 580  580  
 581  581  int
 582  582  zil_claim(const char *osname, void *txarg)
 583  583  {
 584  584          dmu_tx_t *tx = txarg;
 585  585          uint64_t first_txg = dmu_tx_get_txg(tx);
 586  586          zilog_t *zilog;
 587  587          zil_header_t *zh;
 588  588          objset_t *os;
 589  589          int error;
 590  590  
 591  591          error = dmu_objset_hold(osname, FTAG, &os);
 592  592          if (error) {
 593  593                  cmn_err(CE_WARN, "can't open objset for %s", osname);
 594  594                  return (0);
 595  595          }
 596  596  
 597  597          zilog = dmu_objset_zil(os);
 598  598          zh = zil_header_in_syncing_context(zilog);
 599  599  
 600  600          if (spa_get_log_state(zilog->zl_spa) == SPA_LOG_CLEAR) {
 601  601                  if (!BP_IS_HOLE(&zh->zh_log))
 602  602                          zio_free_zil(zilog->zl_spa, first_txg, &zh->zh_log);
 603  603                  BP_ZERO(&zh->zh_log);
 604  604                  dsl_dataset_dirty(dmu_objset_ds(os), tx);
 605  605                  dmu_objset_rele(os, FTAG);
 606  606                  return (0);
 607  607          }
 608  608  
 609  609          /*
 610  610           * Claim all log blocks if we haven't already done so, and remember
 611  611           * the highest claimed sequence number.  This ensures that if we can
 612  612           * read only part of the log now (e.g. due to a missing device),
 613  613           * but we can read the entire log later, we will not try to replay
 614  614           * or destroy beyond the last block we successfully claimed.
 615  615           */
 616  616          ASSERT3U(zh->zh_claim_txg, <=, first_txg);
 617  617          if (zh->zh_claim_txg == 0 && !BP_IS_HOLE(&zh->zh_log)) {
 618  618                  (void) zil_parse(zilog, zil_claim_log_block,
 619  619                      zil_claim_log_record, tx, first_txg);
 620  620                  zh->zh_claim_txg = first_txg;
 621  621                  zh->zh_claim_blk_seq = zilog->zl_parse_blk_seq;
 622  622                  zh->zh_claim_lr_seq = zilog->zl_parse_lr_seq;
 623  623                  if (zilog->zl_parse_lr_count || zilog->zl_parse_blk_count > 1)
 624  624                          zh->zh_flags |= ZIL_REPLAY_NEEDED;
 625  625                  zh->zh_flags |= ZIL_CLAIM_LR_SEQ_VALID;
 626  626                  dsl_dataset_dirty(dmu_objset_ds(os), tx);
 627  627          }
 628  628  
 629  629          ASSERT3U(first_txg, ==, (spa_last_synced_txg(zilog->zl_spa) + 1));
 630  630          dmu_objset_rele(os, FTAG);
 631  631          return (0);
 632  632  }
 633  633  
 634  634  /*
 635  635   * Check the log by walking the log chain.
 636  636   * Checksum errors are ok as they indicate the end of the chain.
 637  637   * Any other error (no device or read failure) returns an error.
 638  638   */
 639  639  int
 640  640  zil_check_log_chain(const char *osname, void *tx)
 641  641  {
 642  642          zilog_t *zilog;
 643  643          objset_t *os;
 644  644          blkptr_t *bp;
 645  645          int error;
 646  646  
 647  647          ASSERT(tx == NULL);
 648  648  
 649  649          error = dmu_objset_hold(osname, FTAG, &os);
 650  650          if (error) {
 651  651                  cmn_err(CE_WARN, "can't open objset for %s", osname);
 652  652                  return (0);
 653  653          }
 654  654  
 655  655          zilog = dmu_objset_zil(os);
 656  656          bp = (blkptr_t *)&zilog->zl_header->zh_log;
 657  657  
 658  658          /*
 659  659           * Check the first block and determine if it's on a log device
 660  660           * which may have been removed or faulted prior to loading this
 661  661           * pool.  If so, there's no point in checking the rest of the log
 662  662           * as its content should have already been synced to the pool.
 663  663           */
 664  664          if (!BP_IS_HOLE(bp)) {
 665  665                  vdev_t *vd;
 666  666                  boolean_t valid = B_TRUE;
 667  667  
 668  668                  spa_config_enter(os->os_spa, SCL_STATE, FTAG, RW_READER);
 669  669                  vd = vdev_lookup_top(os->os_spa, DVA_GET_VDEV(&bp->blk_dva[0]));
 670  670                  if (vd->vdev_islog && vdev_is_dead(vd))
 671  671                          valid = vdev_log_state_valid(vd);
 672  672                  spa_config_exit(os->os_spa, SCL_STATE, FTAG);
 673  673  
 674  674                  if (!valid) {
 675  675                          dmu_objset_rele(os, FTAG);
 676  676                          return (0);
 677  677                  }
 678  678          }
 679  679  
 680  680          /*
 681  681           * Because tx == NULL, zil_claim_log_block() will not actually claim
 682  682           * any blocks, but just determine whether it is possible to do so.
 683  683           * In addition to checking the log chain, zil_claim_log_block()
 684  684           * will invoke zio_claim() with a done func of spa_claim_notify(),
 685  685           * which will update spa_max_claim_txg.  See spa_load() for details.
 686  686           */
 687  687          error = zil_parse(zilog, zil_claim_log_block, zil_claim_log_record, tx,
 688  688              zilog->zl_header->zh_claim_txg ? -1ULL : spa_first_txg(os->os_spa));
 689  689  
 690  690          dmu_objset_rele(os, FTAG);
 691  691  
 692  692          return ((error == ECKSUM || error == ENOENT) ? 0 : error);
 693  693  }
 694  694  
 695  695  static int
 696  696  zil_vdev_compare(const void *x1, const void *x2)
 697  697  {
 698  698          const uint64_t v1 = ((zil_vdev_node_t *)x1)->zv_vdev;
 699  699          const uint64_t v2 = ((zil_vdev_node_t *)x2)->zv_vdev;
 700  700  
 701  701          if (v1 < v2)
 702  702                  return (-1);
 703  703          if (v1 > v2)
 704  704                  return (1);
 705  705  
 706  706          return (0);
 707  707  }
 708  708  
 709  709  void
 710  710  zil_add_block(zilog_t *zilog, const blkptr_t *bp)
 711  711  {
 712  712          avl_tree_t *t = &zilog->zl_vdev_tree;
 713  713          avl_index_t where;
 714  714          zil_vdev_node_t *zv, zvsearch;
 715  715          int ndvas = BP_GET_NDVAS(bp);
 716  716          int i;
 717  717  
 718  718          if (zfs_nocacheflush)
 719  719                  return;
 720  720  
 721  721          ASSERT(zilog->zl_writer);
 722  722  
 723  723          /*
 724  724           * Even though we're zl_writer, we still need a lock because the
 725  725           * zl_get_data() callbacks may have dmu_sync() done callbacks
 726  726           * that will run concurrently.
 727  727           */
 728  728          mutex_enter(&zilog->zl_vdev_lock);
 729  729          for (i = 0; i < ndvas; i++) {
 730  730                  zvsearch.zv_vdev = DVA_GET_VDEV(&bp->blk_dva[i]);
 731  731                  if (avl_find(t, &zvsearch, &where) == NULL) {
 732  732                          zv = kmem_alloc(sizeof (*zv), KM_SLEEP);
 733  733                          zv->zv_vdev = zvsearch.zv_vdev;
 734  734                          avl_insert(t, zv, where);
 735  735                  }
 736  736          }
 737  737          mutex_exit(&zilog->zl_vdev_lock);
 738  738  }
 739  739  
 740  740  static void
 741  741  zil_flush_vdevs(zilog_t *zilog)
 742  742  {
 743  743          spa_t *spa = zilog->zl_spa;
 744  744          avl_tree_t *t = &zilog->zl_vdev_tree;
 745  745          void *cookie = NULL;
 746  746          zil_vdev_node_t *zv;
 747  747          zio_t *zio;
 748  748  
 749  749          ASSERT(zilog->zl_writer);
 750  750  
 751  751          /*
 752  752           * We don't need zl_vdev_lock here because we're the zl_writer,
 753  753           * and all zl_get_data() callbacks are done.
 754  754           */
 755  755          if (avl_numnodes(t) == 0)
 756  756                  return;
 757  757  
 758  758          spa_config_enter(spa, SCL_STATE, FTAG, RW_READER);
 759  759  
 760  760          zio = zio_root(spa, NULL, NULL, ZIO_FLAG_CANFAIL);
 761  761  
 762  762          while ((zv = avl_destroy_nodes(t, &cookie)) != NULL) {
 763  763                  vdev_t *vd = vdev_lookup_top(spa, zv->zv_vdev);
 764  764                  if (vd != NULL)
 765  765                          zio_flush(zio, vd);
 766  766                  kmem_free(zv, sizeof (*zv));
 767  767          }
 768  768  
 769  769          /*
 770  770           * Wait for all the flushes to complete.  Not all devices actually
 771  771           * support the DKIOCFLUSHWRITECACHE ioctl, so it's OK if it fails.
 772  772           */
 773  773          (void) zio_wait(zio);
 774  774  
 775  775          spa_config_exit(spa, SCL_STATE, FTAG);
 776  776  }
 777  777  
 778  778  /*
 779  779   * Function called when a log block write completes
 780  780   */
 781  781  static void
 782  782  zil_lwb_write_done(zio_t *zio)
 783  783  {
 784  784          lwb_t *lwb = zio->io_private;
 785  785          zilog_t *zilog = lwb->lwb_zilog;
 786  786          dmu_tx_t *tx = lwb->lwb_tx;
 787  787  
 788  788          ASSERT(BP_GET_COMPRESS(zio->io_bp) == ZIO_COMPRESS_OFF);
 789  789          ASSERT(BP_GET_TYPE(zio->io_bp) == DMU_OT_INTENT_LOG);
 790  790          ASSERT(BP_GET_LEVEL(zio->io_bp) == 0);
 791  791          ASSERT(BP_GET_BYTEORDER(zio->io_bp) == ZFS_HOST_BYTEORDER);
 792  792          ASSERT(!BP_IS_GANG(zio->io_bp));
 793  793          ASSERT(!BP_IS_HOLE(zio->io_bp));
 794  794          ASSERT(zio->io_bp->blk_fill == 0);
 795  795  
 796  796          /*
 797  797           * Ensure the lwb buffer pointer is cleared before releasing
 798  798           * the txg. If we have had an allocation failure and
 799  799           * the txg is waiting to sync then we want want zil_sync()
 800  800           * to remove the lwb so that it's not picked up as the next new
 801  801           * one in zil_commit_writer(). zil_sync() will only remove
 802  802           * the lwb if lwb_buf is null.
 803  803           */
 804  804          zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
 805  805          mutex_enter(&zilog->zl_lock);
 806  806          lwb->lwb_buf = NULL;
 807  807          lwb->lwb_tx = NULL;
 808  808          mutex_exit(&zilog->zl_lock);
 809  809  
 810  810          /*
 811  811           * Now that we've written this log block, we have a stable pointer
 812  812           * to the next block in the chain, so it's OK to let the txg in
 813  813           * which we allocated the next block sync.
 814  814           */
 815  815          dmu_tx_commit(tx);
 816  816  }
 817  817  
 818  818  /*
 819  819   * Initialize the io for a log block.
 820  820   */
 821  821  static void
 822  822  zil_lwb_write_init(zilog_t *zilog, lwb_t *lwb)
 823  823  {
 824  824          zbookmark_t zb;
 825  825  
 826  826          SET_BOOKMARK(&zb, lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_OBJSET],
 827  827              ZB_ZIL_OBJECT, ZB_ZIL_LEVEL,
 828  828              lwb->lwb_blk.blk_cksum.zc_word[ZIL_ZC_SEQ]);
 829  829  
 830  830          if (zilog->zl_root_zio == NULL) {
 831  831                  zilog->zl_root_zio = zio_root(zilog->zl_spa, NULL, NULL,
 832  832                      ZIO_FLAG_CANFAIL);
 833  833          }
 834  834          if (lwb->lwb_zio == NULL) {
 835  835                  lwb->lwb_zio = zio_rewrite(zilog->zl_root_zio, zilog->zl_spa,
 836  836                      0, &lwb->lwb_blk, lwb->lwb_buf, BP_GET_LSIZE(&lwb->lwb_blk),
 837  837                      zil_lwb_write_done, lwb, ZIO_PRIORITY_LOG_WRITE,
 838  838                      ZIO_FLAG_CANFAIL | ZIO_FLAG_DONT_PROPAGATE, &zb);
 839  839          }
 840  840  }
 841  841  
 842  842  /*
 843  843   * Define a limited set of intent log block sizes.
 844  844   * These must be a multiple of 4KB. Note only the amount used (again
 845  845   * aligned to 4KB) actually gets written. However, we can't always just
 846  846   * allocate SPA_MAXBLOCKSIZE as the slog space could be exhausted.
 847  847   */
 848  848  uint64_t zil_block_buckets[] = {
 849  849      4096,               /* non TX_WRITE */
 850  850      8192+4096,          /* data base */
 851  851      32*1024 + 4096,     /* NFS writes */
 852  852      UINT64_MAX
 853  853  };
 854  854  
 855  855  /*
 856  856   * Use the slog as long as the logbias is 'latency' and the current commit size
 857  857   * is less than the limit or the total list size is less than 2X the limit.
 858  858   * Limit checking is disabled by setting zil_slog_limit to UINT64_MAX.
 859  859   */
 860  860  uint64_t zil_slog_limit = 1024 * 1024;
 861  861  #define USE_SLOG(zilog) (((zilog)->zl_logbias == ZFS_LOGBIAS_LATENCY) && \
 862  862          (((zilog)->zl_cur_used < zil_slog_limit) || \
 863  863          ((zilog)->zl_itx_list_sz < (zil_slog_limit << 1))))
 864  864  
 865  865  /*
 866  866   * Start a log block write and advance to the next log block.
 867  867   * Calls are serialized.
 868  868   */
 869  869  static lwb_t *
 870  870  zil_lwb_write_start(zilog_t *zilog, lwb_t *lwb)
 871  871  {
 872  872          lwb_t *nlwb = NULL;
 873  873          zil_chain_t *zilc;
 874  874          spa_t *spa = zilog->zl_spa;
 875  875          blkptr_t *bp;
 876  876          dmu_tx_t *tx;
 877  877          uint64_t txg;
 878  878          uint64_t zil_blksz, wsz;
 879  879          int i, error;
 880  880  
 881  881          if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
 882  882                  zilc = (zil_chain_t *)lwb->lwb_buf;
 883  883                  bp = &zilc->zc_next_blk;
 884  884          } else {
 885  885                  zilc = (zil_chain_t *)(lwb->lwb_buf + lwb->lwb_sz);
 886  886                  bp = &zilc->zc_next_blk;
 887  887          }
 888  888  
 889  889          ASSERT(lwb->lwb_nused <= lwb->lwb_sz);
 890  890  
 891  891          /*
 892  892           * Allocate the next block and save its address in this block
 893  893           * before writing it in order to establish the log chain.
 894  894           * Note that if the allocation of nlwb synced before we wrote
 895  895           * the block that points at it (lwb), we'd leak it if we crashed.
 896  896           * Therefore, we don't do dmu_tx_commit() until zil_lwb_write_done().
 897  897           * We dirty the dataset to ensure that zil_sync() will be called
 898  898           * to clean up in the event of allocation failure or I/O failure.
 899  899           */
 900  900          tx = dmu_tx_create(zilog->zl_os);
 901  901          VERIFY(dmu_tx_assign(tx, TXG_WAIT) == 0);
 902  902          dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
 903  903          txg = dmu_tx_get_txg(tx);
 904  904  
 905  905          lwb->lwb_tx = tx;
 906  906  
 907  907          /*
 908  908           * Log blocks are pre-allocated. Here we select the size of the next
 909  909           * block, based on size used in the last block.
 910  910           * - first find the smallest bucket that will fit the block from a
 911  911           *   limited set of block sizes. This is because it's faster to write
 912  912           *   blocks allocated from the same metaslab as they are adjacent or
 913  913           *   close.
 914  914           * - next find the maximum from the new suggested size and an array of
 915  915           *   previous sizes. This lessens a picket fence effect of wrongly
 916  916           *   guesssing the size if we have a stream of say 2k, 64k, 2k, 64k
 917  917           *   requests.
 918  918           *
 919  919           * Note we only write what is used, but we can't just allocate
 920  920           * the maximum block size because we can exhaust the available
 921  921           * pool log space.
 922  922           */
 923  923          zil_blksz = zilog->zl_cur_used + sizeof (zil_chain_t);
 924  924          for (i = 0; zil_blksz > zil_block_buckets[i]; i++)
 925  925                  continue;
 926  926          zil_blksz = zil_block_buckets[i];
 927  927          if (zil_blksz == UINT64_MAX)
 928  928                  zil_blksz = SPA_MAXBLOCKSIZE;
 929  929          zilog->zl_prev_blks[zilog->zl_prev_rotor] = zil_blksz;
 930  930          for (i = 0; i < ZIL_PREV_BLKS; i++)
 931  931                  zil_blksz = MAX(zil_blksz, zilog->zl_prev_blks[i]);
 932  932          zilog->zl_prev_rotor = (zilog->zl_prev_rotor + 1) & (ZIL_PREV_BLKS - 1);
 933  933  
 934  934          BP_ZERO(bp);
 935  935          /* pass the old blkptr in order to spread log blocks across devs */
 936  936          error = zio_alloc_zil(spa, txg, bp, &lwb->lwb_blk, zil_blksz,
 937  937              USE_SLOG(zilog));
 938  938          if (!error) {
 939  939                  ASSERT3U(bp->blk_birth, ==, txg);
 940  940                  bp->blk_cksum = lwb->lwb_blk.blk_cksum;
 941  941                  bp->blk_cksum.zc_word[ZIL_ZC_SEQ]++;
 942  942  
 943  943                  /*
 944  944                   * Allocate a new log write buffer (lwb).
 945  945                   */
 946  946                  nlwb = zil_alloc_lwb(zilog, bp, txg);
 947  947  
 948  948                  /* Record the block for later vdev flushing */
 949  949                  zil_add_block(zilog, &lwb->lwb_blk);
 950  950          }
 951  951  
 952  952          if (BP_GET_CHECKSUM(&lwb->lwb_blk) == ZIO_CHECKSUM_ZILOG2) {
 953  953                  /* For Slim ZIL only write what is used. */
 954  954                  wsz = P2ROUNDUP_TYPED(lwb->lwb_nused, ZIL_MIN_BLKSZ, uint64_t);
 955  955                  ASSERT3U(wsz, <=, lwb->lwb_sz);
 956  956                  zio_shrink(lwb->lwb_zio, wsz);
 957  957  
 958  958          } else {
 959  959                  wsz = lwb->lwb_sz;
 960  960          }
 961  961  
 962  962          zilc->zc_pad = 0;
 963  963          zilc->zc_nused = lwb->lwb_nused;
 964  964          zilc->zc_eck.zec_cksum = lwb->lwb_blk.blk_cksum;
 965  965  
 966  966          /*
 967  967           * clear unused data for security
 968  968           */
 969  969          bzero(lwb->lwb_buf + lwb->lwb_nused, wsz - lwb->lwb_nused);
 970  970  
 971  971          zio_nowait(lwb->lwb_zio); /* Kick off the write for the old log block */
 972  972  
 973  973          /*
 974  974           * If there was an allocation failure then nlwb will be null which
 975  975           * forces a txg_wait_synced().
 976  976           */
 977  977          return (nlwb);
 978  978  }
 979  979  
 980  980  static lwb_t *
 981  981  zil_lwb_commit(zilog_t *zilog, itx_t *itx, lwb_t *lwb)
 982  982  {
 983  983          lr_t *lrc = &itx->itx_lr; /* common log record */
 984  984          lr_write_t *lrw = (lr_write_t *)lrc;
 985  985          char *lr_buf;
 986  986          uint64_t txg = lrc->lrc_txg;
 987  987          uint64_t reclen = lrc->lrc_reclen;
 988  988          uint64_t dlen = 0;
 989  989  
 990  990          if (lwb == NULL)
 991  991                  return (NULL);
 992  992  
 993  993          ASSERT(lwb->lwb_buf != NULL);
 994  994  
 995  995          if (lrc->lrc_txtype == TX_WRITE && itx->itx_wr_state == WR_NEED_COPY)
 996  996                  dlen = P2ROUNDUP_TYPED(
 997  997                      lrw->lr_length, sizeof (uint64_t), uint64_t);
 998  998  
 999  999          zilog->zl_cur_used += (reclen + dlen);
1000 1000  
1001 1001          zil_lwb_write_init(zilog, lwb);
1002 1002  
1003 1003          /*
1004 1004           * If this record won't fit in the current log block, start a new one.
1005 1005           */
1006 1006          if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1007 1007                  lwb = zil_lwb_write_start(zilog, lwb);
1008 1008                  if (lwb == NULL)
1009 1009                          return (NULL);
1010 1010                  zil_lwb_write_init(zilog, lwb);
1011 1011                  ASSERT(LWB_EMPTY(lwb));
1012 1012                  if (lwb->lwb_nused + reclen + dlen > lwb->lwb_sz) {
1013 1013                          txg_wait_synced(zilog->zl_dmu_pool, txg);
1014 1014                          return (lwb);
1015 1015                  }
1016 1016          }
1017 1017  
1018 1018          lr_buf = lwb->lwb_buf + lwb->lwb_nused;
1019 1019          bcopy(lrc, lr_buf, reclen);
1020 1020          lrc = (lr_t *)lr_buf;
1021 1021          lrw = (lr_write_t *)lrc;
1022 1022  
1023 1023          /*
1024 1024           * If it's a write, fetch the data or get its blkptr as appropriate.
1025 1025           */
1026 1026          if (lrc->lrc_txtype == TX_WRITE) {
1027 1027                  if (txg > spa_freeze_txg(zilog->zl_spa))
1028 1028                          txg_wait_synced(zilog->zl_dmu_pool, txg);
1029 1029                  if (itx->itx_wr_state != WR_COPIED) {
1030 1030                          char *dbuf;
1031 1031                          int error;
1032 1032  
1033 1033                          if (dlen) {
1034 1034                                  ASSERT(itx->itx_wr_state == WR_NEED_COPY);
1035 1035                                  dbuf = lr_buf + reclen;
1036 1036                                  lrw->lr_common.lrc_reclen += dlen;
1037 1037                          } else {
1038 1038                                  ASSERT(itx->itx_wr_state == WR_INDIRECT);
1039 1039                                  dbuf = NULL;
1040 1040                          }
1041 1041                          error = zilog->zl_get_data(
1042 1042                              itx->itx_private, lrw, dbuf, lwb->lwb_zio);
1043 1043                          if (error == EIO) {
1044 1044                                  txg_wait_synced(zilog->zl_dmu_pool, txg);
1045 1045                                  return (lwb);
1046 1046                          }
1047 1047                          if (error) {
1048 1048                                  ASSERT(error == ENOENT || error == EEXIST ||
1049 1049                                      error == EALREADY);
1050 1050                                  return (lwb);
1051 1051                          }
1052 1052                  }
1053 1053          }
1054 1054  
  
    | 
      ↓ open down ↓ | 
    1021 lines elided | 
    
      ↑ open up ↑ | 
  
1055 1055          /*
1056 1056           * We're actually making an entry, so update lrc_seq to be the
1057 1057           * log record sequence number.  Note that this is generally not
1058 1058           * equal to the itx sequence number because not all transactions
1059 1059           * are synchronous, and sometimes spa_sync() gets there first.
1060 1060           */
1061 1061          lrc->lrc_seq = ++zilog->zl_lr_seq; /* we are single threaded */
1062 1062          lwb->lwb_nused += reclen + dlen;
1063 1063          lwb->lwb_max_txg = MAX(lwb->lwb_max_txg, txg);
1064 1064          ASSERT3U(lwb->lwb_nused, <=, lwb->lwb_sz);
1065      -        ASSERT3U(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)), ==, 0);
     1065 +        ASSERT0(P2PHASE(lwb->lwb_nused, sizeof (uint64_t)));
1066 1066  
1067 1067          return (lwb);
1068 1068  }
1069 1069  
1070 1070  itx_t *
1071 1071  zil_itx_create(uint64_t txtype, size_t lrsize)
1072 1072  {
1073 1073          itx_t *itx;
1074 1074  
1075 1075          lrsize = P2ROUNDUP_TYPED(lrsize, sizeof (uint64_t), size_t);
1076 1076  
1077 1077          itx = kmem_alloc(offsetof(itx_t, itx_lr) + lrsize, KM_SLEEP);
1078 1078          itx->itx_lr.lrc_txtype = txtype;
1079 1079          itx->itx_lr.lrc_reclen = lrsize;
1080 1080          itx->itx_sod = lrsize; /* if write & WR_NEED_COPY will be increased */
1081 1081          itx->itx_lr.lrc_seq = 0;        /* defensive */
1082 1082          itx->itx_sync = B_TRUE;         /* default is synchronous */
1083 1083  
1084 1084          return (itx);
1085 1085  }
1086 1086  
1087 1087  void
1088 1088  zil_itx_destroy(itx_t *itx)
1089 1089  {
1090 1090          kmem_free(itx, offsetof(itx_t, itx_lr) + itx->itx_lr.lrc_reclen);
1091 1091  }
1092 1092  
1093 1093  /*
1094 1094   * Free up the sync and async itxs. The itxs_t has already been detached
1095 1095   * so no locks are needed.
1096 1096   */
1097 1097  static void
1098 1098  zil_itxg_clean(itxs_t *itxs)
1099 1099  {
1100 1100          itx_t *itx;
1101 1101          list_t *list;
1102 1102          avl_tree_t *t;
1103 1103          void *cookie;
1104 1104          itx_async_node_t *ian;
1105 1105  
1106 1106          list = &itxs->i_sync_list;
1107 1107          while ((itx = list_head(list)) != NULL) {
1108 1108                  list_remove(list, itx);
1109 1109                  kmem_free(itx, offsetof(itx_t, itx_lr) +
1110 1110                      itx->itx_lr.lrc_reclen);
1111 1111          }
1112 1112  
1113 1113          cookie = NULL;
1114 1114          t = &itxs->i_async_tree;
1115 1115          while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1116 1116                  list = &ian->ia_list;
1117 1117                  while ((itx = list_head(list)) != NULL) {
1118 1118                          list_remove(list, itx);
1119 1119                          kmem_free(itx, offsetof(itx_t, itx_lr) +
1120 1120                              itx->itx_lr.lrc_reclen);
1121 1121                  }
1122 1122                  list_destroy(list);
1123 1123                  kmem_free(ian, sizeof (itx_async_node_t));
1124 1124          }
1125 1125          avl_destroy(t);
1126 1126  
1127 1127          kmem_free(itxs, sizeof (itxs_t));
1128 1128  }
1129 1129  
1130 1130  static int
1131 1131  zil_aitx_compare(const void *x1, const void *x2)
1132 1132  {
1133 1133          const uint64_t o1 = ((itx_async_node_t *)x1)->ia_foid;
1134 1134          const uint64_t o2 = ((itx_async_node_t *)x2)->ia_foid;
1135 1135  
1136 1136          if (o1 < o2)
1137 1137                  return (-1);
1138 1138          if (o1 > o2)
1139 1139                  return (1);
1140 1140  
1141 1141          return (0);
1142 1142  }
1143 1143  
1144 1144  /*
1145 1145   * Remove all async itx with the given oid.
1146 1146   */
1147 1147  static void
1148 1148  zil_remove_async(zilog_t *zilog, uint64_t oid)
1149 1149  {
1150 1150          uint64_t otxg, txg;
1151 1151          itx_async_node_t *ian;
1152 1152          avl_tree_t *t;
1153 1153          avl_index_t where;
1154 1154          list_t clean_list;
1155 1155          itx_t *itx;
1156 1156  
1157 1157          ASSERT(oid != 0);
1158 1158          list_create(&clean_list, sizeof (itx_t), offsetof(itx_t, itx_node));
1159 1159  
1160 1160          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1161 1161                  otxg = ZILTEST_TXG;
1162 1162          else
1163 1163                  otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1164 1164  
1165 1165          for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1166 1166                  itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1167 1167  
1168 1168                  mutex_enter(&itxg->itxg_lock);
1169 1169                  if (itxg->itxg_txg != txg) {
1170 1170                          mutex_exit(&itxg->itxg_lock);
1171 1171                          continue;
1172 1172                  }
1173 1173  
1174 1174                  /*
1175 1175                   * Locate the object node and append its list.
1176 1176                   */
1177 1177                  t = &itxg->itxg_itxs->i_async_tree;
1178 1178                  ian = avl_find(t, &oid, &where);
1179 1179                  if (ian != NULL)
1180 1180                          list_move_tail(&clean_list, &ian->ia_list);
1181 1181                  mutex_exit(&itxg->itxg_lock);
1182 1182          }
1183 1183          while ((itx = list_head(&clean_list)) != NULL) {
1184 1184                  list_remove(&clean_list, itx);
1185 1185                  kmem_free(itx, offsetof(itx_t, itx_lr) +
1186 1186                      itx->itx_lr.lrc_reclen);
1187 1187          }
1188 1188          list_destroy(&clean_list);
1189 1189  }
1190 1190  
1191 1191  void
1192 1192  zil_itx_assign(zilog_t *zilog, itx_t *itx, dmu_tx_t *tx)
1193 1193  {
1194 1194          uint64_t txg;
1195 1195          itxg_t *itxg;
1196 1196          itxs_t *itxs, *clean = NULL;
1197 1197  
1198 1198          /*
1199 1199           * Object ids can be re-instantiated in the next txg so
1200 1200           * remove any async transactions to avoid future leaks.
1201 1201           * This can happen if a fsync occurs on the re-instantiated
1202 1202           * object for a WR_INDIRECT or WR_NEED_COPY write, which gets
1203 1203           * the new file data and flushes a write record for the old object.
1204 1204           */
1205 1205          if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_REMOVE)
1206 1206                  zil_remove_async(zilog, itx->itx_oid);
1207 1207  
1208 1208          /*
1209 1209           * Ensure the data of a renamed file is committed before the rename.
1210 1210           */
1211 1211          if ((itx->itx_lr.lrc_txtype & ~TX_CI) == TX_RENAME)
1212 1212                  zil_async_to_sync(zilog, itx->itx_oid);
1213 1213  
1214 1214          if (spa_freeze_txg(zilog->zl_spa) !=  UINT64_MAX)
1215 1215                  txg = ZILTEST_TXG;
1216 1216          else
1217 1217                  txg = dmu_tx_get_txg(tx);
1218 1218  
1219 1219          itxg = &zilog->zl_itxg[txg & TXG_MASK];
1220 1220          mutex_enter(&itxg->itxg_lock);
1221 1221          itxs = itxg->itxg_itxs;
1222 1222          if (itxg->itxg_txg != txg) {
1223 1223                  if (itxs != NULL) {
1224 1224                          /*
1225 1225                           * The zil_clean callback hasn't got around to cleaning
1226 1226                           * this itxg. Save the itxs for release below.
1227 1227                           * This should be rare.
1228 1228                           */
1229 1229                          atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1230 1230                          itxg->itxg_sod = 0;
1231 1231                          clean = itxg->itxg_itxs;
1232 1232                  }
1233 1233                  ASSERT(itxg->itxg_sod == 0);
1234 1234                  itxg->itxg_txg = txg;
1235 1235                  itxs = itxg->itxg_itxs = kmem_zalloc(sizeof (itxs_t), KM_SLEEP);
1236 1236  
1237 1237                  list_create(&itxs->i_sync_list, sizeof (itx_t),
1238 1238                      offsetof(itx_t, itx_node));
1239 1239                  avl_create(&itxs->i_async_tree, zil_aitx_compare,
1240 1240                      sizeof (itx_async_node_t),
1241 1241                      offsetof(itx_async_node_t, ia_node));
1242 1242          }
1243 1243          if (itx->itx_sync) {
1244 1244                  list_insert_tail(&itxs->i_sync_list, itx);
1245 1245                  atomic_add_64(&zilog->zl_itx_list_sz, itx->itx_sod);
1246 1246                  itxg->itxg_sod += itx->itx_sod;
1247 1247          } else {
1248 1248                  avl_tree_t *t = &itxs->i_async_tree;
1249 1249                  uint64_t foid = ((lr_ooo_t *)&itx->itx_lr)->lr_foid;
1250 1250                  itx_async_node_t *ian;
1251 1251                  avl_index_t where;
1252 1252  
1253 1253                  ian = avl_find(t, &foid, &where);
1254 1254                  if (ian == NULL) {
1255 1255                          ian = kmem_alloc(sizeof (itx_async_node_t), KM_SLEEP);
1256 1256                          list_create(&ian->ia_list, sizeof (itx_t),
1257 1257                              offsetof(itx_t, itx_node));
1258 1258                          ian->ia_foid = foid;
1259 1259                          avl_insert(t, ian, where);
1260 1260                  }
1261 1261                  list_insert_tail(&ian->ia_list, itx);
1262 1262          }
1263 1263  
1264 1264          itx->itx_lr.lrc_txg = dmu_tx_get_txg(tx);
1265 1265          mutex_exit(&itxg->itxg_lock);
1266 1266  
1267 1267          /* Release the old itxs now we've dropped the lock */
1268 1268          if (clean != NULL)
1269 1269                  zil_itxg_clean(clean);
1270 1270  }
1271 1271  
1272 1272  /*
1273 1273   * If there are any in-memory intent log transactions which have now been
1274 1274   * synced then start up a taskq to free them.
1275 1275   */
1276 1276  void
1277 1277  zil_clean(zilog_t *zilog, uint64_t synced_txg)
1278 1278  {
1279 1279          itxg_t *itxg = &zilog->zl_itxg[synced_txg & TXG_MASK];
1280 1280          itxs_t *clean_me;
1281 1281  
1282 1282          mutex_enter(&itxg->itxg_lock);
1283 1283          if (itxg->itxg_itxs == NULL || itxg->itxg_txg == ZILTEST_TXG) {
1284 1284                  mutex_exit(&itxg->itxg_lock);
1285 1285                  return;
1286 1286          }
1287 1287          ASSERT3U(itxg->itxg_txg, <=, synced_txg);
1288 1288          ASSERT(itxg->itxg_txg != 0);
1289 1289          ASSERT(zilog->zl_clean_taskq != NULL);
1290 1290          atomic_add_64(&zilog->zl_itx_list_sz, -itxg->itxg_sod);
1291 1291          itxg->itxg_sod = 0;
1292 1292          clean_me = itxg->itxg_itxs;
1293 1293          itxg->itxg_itxs = NULL;
1294 1294          itxg->itxg_txg = 0;
1295 1295          mutex_exit(&itxg->itxg_lock);
1296 1296          /*
1297 1297           * Preferably start a task queue to free up the old itxs but
1298 1298           * if taskq_dispatch can't allocate resources to do that then
1299 1299           * free it in-line. This should be rare. Note, using TQ_SLEEP
1300 1300           * created a bad performance problem.
1301 1301           */
1302 1302          if (taskq_dispatch(zilog->zl_clean_taskq,
1303 1303              (void (*)(void *))zil_itxg_clean, clean_me, TQ_NOSLEEP) == NULL)
1304 1304                  zil_itxg_clean(clean_me);
1305 1305  }
1306 1306  
1307 1307  /*
1308 1308   * Get the list of itxs to commit into zl_itx_commit_list.
1309 1309   */
1310 1310  static void
1311 1311  zil_get_commit_list(zilog_t *zilog)
1312 1312  {
1313 1313          uint64_t otxg, txg;
1314 1314          list_t *commit_list = &zilog->zl_itx_commit_list;
1315 1315          uint64_t push_sod = 0;
1316 1316  
1317 1317          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1318 1318                  otxg = ZILTEST_TXG;
1319 1319          else
1320 1320                  otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1321 1321  
1322 1322          for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1323 1323                  itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1324 1324  
1325 1325                  mutex_enter(&itxg->itxg_lock);
1326 1326                  if (itxg->itxg_txg != txg) {
1327 1327                          mutex_exit(&itxg->itxg_lock);
1328 1328                          continue;
1329 1329                  }
1330 1330  
1331 1331                  list_move_tail(commit_list, &itxg->itxg_itxs->i_sync_list);
1332 1332                  push_sod += itxg->itxg_sod;
1333 1333                  itxg->itxg_sod = 0;
1334 1334  
1335 1335                  mutex_exit(&itxg->itxg_lock);
1336 1336          }
1337 1337          atomic_add_64(&zilog->zl_itx_list_sz, -push_sod);
1338 1338  }
1339 1339  
1340 1340  /*
1341 1341   * Move the async itxs for a specified object to commit into sync lists.
1342 1342   */
1343 1343  static void
1344 1344  zil_async_to_sync(zilog_t *zilog, uint64_t foid)
1345 1345  {
1346 1346          uint64_t otxg, txg;
1347 1347          itx_async_node_t *ian;
1348 1348          avl_tree_t *t;
1349 1349          avl_index_t where;
1350 1350  
1351 1351          if (spa_freeze_txg(zilog->zl_spa) != UINT64_MAX) /* ziltest support */
1352 1352                  otxg = ZILTEST_TXG;
1353 1353          else
1354 1354                  otxg = spa_last_synced_txg(zilog->zl_spa) + 1;
1355 1355  
1356 1356          for (txg = otxg; txg < (otxg + TXG_CONCURRENT_STATES); txg++) {
1357 1357                  itxg_t *itxg = &zilog->zl_itxg[txg & TXG_MASK];
1358 1358  
1359 1359                  mutex_enter(&itxg->itxg_lock);
1360 1360                  if (itxg->itxg_txg != txg) {
1361 1361                          mutex_exit(&itxg->itxg_lock);
1362 1362                          continue;
1363 1363                  }
1364 1364  
1365 1365                  /*
1366 1366                   * If a foid is specified then find that node and append its
1367 1367                   * list. Otherwise walk the tree appending all the lists
1368 1368                   * to the sync list. We add to the end rather than the
1369 1369                   * beginning to ensure the create has happened.
1370 1370                   */
1371 1371                  t = &itxg->itxg_itxs->i_async_tree;
1372 1372                  if (foid != 0) {
1373 1373                          ian = avl_find(t, &foid, &where);
1374 1374                          if (ian != NULL) {
1375 1375                                  list_move_tail(&itxg->itxg_itxs->i_sync_list,
1376 1376                                      &ian->ia_list);
1377 1377                          }
1378 1378                  } else {
1379 1379                          void *cookie = NULL;
1380 1380  
1381 1381                          while ((ian = avl_destroy_nodes(t, &cookie)) != NULL) {
1382 1382                                  list_move_tail(&itxg->itxg_itxs->i_sync_list,
1383 1383                                      &ian->ia_list);
1384 1384                                  list_destroy(&ian->ia_list);
1385 1385                                  kmem_free(ian, sizeof (itx_async_node_t));
1386 1386                          }
1387 1387                  }
1388 1388                  mutex_exit(&itxg->itxg_lock);
1389 1389          }
1390 1390  }
1391 1391  
1392 1392  static void
1393 1393  zil_commit_writer(zilog_t *zilog)
1394 1394  {
1395 1395          uint64_t txg;
1396 1396          itx_t *itx;
1397 1397          lwb_t *lwb;
1398 1398          spa_t *spa = zilog->zl_spa;
1399 1399          int error = 0;
1400 1400  
1401 1401          ASSERT(zilog->zl_root_zio == NULL);
1402 1402  
1403 1403          mutex_exit(&zilog->zl_lock);
1404 1404  
1405 1405          zil_get_commit_list(zilog);
1406 1406  
1407 1407          /*
1408 1408           * Return if there's nothing to commit before we dirty the fs by
1409 1409           * calling zil_create().
1410 1410           */
1411 1411          if (list_head(&zilog->zl_itx_commit_list) == NULL) {
1412 1412                  mutex_enter(&zilog->zl_lock);
1413 1413                  return;
1414 1414          }
1415 1415  
1416 1416          if (zilog->zl_suspend) {
1417 1417                  lwb = NULL;
1418 1418          } else {
1419 1419                  lwb = list_tail(&zilog->zl_lwb_list);
1420 1420                  if (lwb == NULL)
1421 1421                          lwb = zil_create(zilog);
1422 1422          }
1423 1423  
1424 1424          DTRACE_PROBE1(zil__cw1, zilog_t *, zilog);
1425 1425          while (itx = list_head(&zilog->zl_itx_commit_list)) {
1426 1426                  txg = itx->itx_lr.lrc_txg;
1427 1427                  ASSERT(txg);
1428 1428  
1429 1429                  if (txg > spa_last_synced_txg(spa) || txg > spa_freeze_txg(spa))
1430 1430                          lwb = zil_lwb_commit(zilog, itx, lwb);
1431 1431                  list_remove(&zilog->zl_itx_commit_list, itx);
1432 1432                  kmem_free(itx, offsetof(itx_t, itx_lr)
1433 1433                      + itx->itx_lr.lrc_reclen);
1434 1434          }
1435 1435          DTRACE_PROBE1(zil__cw2, zilog_t *, zilog);
1436 1436  
1437 1437          /* write the last block out */
1438 1438          if (lwb != NULL && lwb->lwb_zio != NULL)
1439 1439                  lwb = zil_lwb_write_start(zilog, lwb);
1440 1440  
1441 1441          zilog->zl_cur_used = 0;
1442 1442  
1443 1443          /*
1444 1444           * Wait if necessary for the log blocks to be on stable storage.
1445 1445           */
1446 1446          if (zilog->zl_root_zio) {
1447 1447                  error = zio_wait(zilog->zl_root_zio);
1448 1448                  zilog->zl_root_zio = NULL;
1449 1449                  zil_flush_vdevs(zilog);
1450 1450          }
1451 1451  
1452 1452          if (error || lwb == NULL)
1453 1453                  txg_wait_synced(zilog->zl_dmu_pool, 0);
1454 1454  
1455 1455          mutex_enter(&zilog->zl_lock);
1456 1456  
1457 1457          /*
1458 1458           * Remember the highest committed log sequence number for ztest.
1459 1459           * We only update this value when all the log writes succeeded,
1460 1460           * because ztest wants to ASSERT that it got the whole log chain.
1461 1461           */
1462 1462          if (error == 0 && lwb != NULL)
1463 1463                  zilog->zl_commit_lr_seq = zilog->zl_lr_seq;
1464 1464  }
1465 1465  
1466 1466  /*
1467 1467   * Commit zfs transactions to stable storage.
1468 1468   * If foid is 0 push out all transactions, otherwise push only those
1469 1469   * for that object or might reference that object.
1470 1470   *
1471 1471   * itxs are committed in batches. In a heavily stressed zil there will be
1472 1472   * a commit writer thread who is writing out a bunch of itxs to the log
1473 1473   * for a set of committing threads (cthreads) in the same batch as the writer.
1474 1474   * Those cthreads are all waiting on the same cv for that batch.
1475 1475   *
1476 1476   * There will also be a different and growing batch of threads that are
1477 1477   * waiting to commit (qthreads). When the committing batch completes
1478 1478   * a transition occurs such that the cthreads exit and the qthreads become
1479 1479   * cthreads. One of the new cthreads becomes the writer thread for the
1480 1480   * batch. Any new threads arriving become new qthreads.
1481 1481   *
1482 1482   * Only 2 condition variables are needed and there's no transition
1483 1483   * between the two cvs needed. They just flip-flop between qthreads
1484 1484   * and cthreads.
1485 1485   *
1486 1486   * Using this scheme we can efficiently wakeup up only those threads
1487 1487   * that have been committed.
1488 1488   */
1489 1489  void
1490 1490  zil_commit(zilog_t *zilog, uint64_t foid)
1491 1491  {
1492 1492          uint64_t mybatch;
1493 1493  
1494 1494          if (zilog->zl_sync == ZFS_SYNC_DISABLED)
1495 1495                  return;
1496 1496  
1497 1497          /* move the async itxs for the foid to the sync queues */
1498 1498          zil_async_to_sync(zilog, foid);
1499 1499  
1500 1500          mutex_enter(&zilog->zl_lock);
1501 1501          mybatch = zilog->zl_next_batch;
1502 1502          while (zilog->zl_writer) {
1503 1503                  cv_wait(&zilog->zl_cv_batch[mybatch & 1], &zilog->zl_lock);
1504 1504                  if (mybatch <= zilog->zl_com_batch) {
1505 1505                          mutex_exit(&zilog->zl_lock);
1506 1506                          return;
1507 1507                  }
1508 1508          }
1509 1509  
1510 1510          zilog->zl_next_batch++;
1511 1511          zilog->zl_writer = B_TRUE;
1512 1512          zil_commit_writer(zilog);
1513 1513          zilog->zl_com_batch = mybatch;
1514 1514          zilog->zl_writer = B_FALSE;
1515 1515          mutex_exit(&zilog->zl_lock);
1516 1516  
1517 1517          /* wake up one thread to become the next writer */
1518 1518          cv_signal(&zilog->zl_cv_batch[(mybatch+1) & 1]);
1519 1519  
1520 1520          /* wake up all threads waiting for this batch to be committed */
1521 1521          cv_broadcast(&zilog->zl_cv_batch[mybatch & 1]);
1522 1522  }
1523 1523  
1524 1524  /*
1525 1525   * Called in syncing context to free committed log blocks and update log header.
1526 1526   */
1527 1527  void
1528 1528  zil_sync(zilog_t *zilog, dmu_tx_t *tx)
1529 1529  {
1530 1530          zil_header_t *zh = zil_header_in_syncing_context(zilog);
1531 1531          uint64_t txg = dmu_tx_get_txg(tx);
1532 1532          spa_t *spa = zilog->zl_spa;
1533 1533          uint64_t *replayed_seq = &zilog->zl_replayed_seq[txg & TXG_MASK];
1534 1534          lwb_t *lwb;
1535 1535  
1536 1536          /*
1537 1537           * We don't zero out zl_destroy_txg, so make sure we don't try
1538 1538           * to destroy it twice.
1539 1539           */
1540 1540          if (spa_sync_pass(spa) != 1)
1541 1541                  return;
1542 1542  
1543 1543          mutex_enter(&zilog->zl_lock);
1544 1544  
1545 1545          ASSERT(zilog->zl_stop_sync == 0);
1546 1546  
1547 1547          if (*replayed_seq != 0) {
1548 1548                  ASSERT(zh->zh_replay_seq < *replayed_seq);
1549 1549                  zh->zh_replay_seq = *replayed_seq;
1550 1550                  *replayed_seq = 0;
1551 1551          }
1552 1552  
1553 1553          if (zilog->zl_destroy_txg == txg) {
1554 1554                  blkptr_t blk = zh->zh_log;
1555 1555  
1556 1556                  ASSERT(list_head(&zilog->zl_lwb_list) == NULL);
1557 1557  
1558 1558                  bzero(zh, sizeof (zil_header_t));
1559 1559                  bzero(zilog->zl_replayed_seq, sizeof (zilog->zl_replayed_seq));
1560 1560  
1561 1561                  if (zilog->zl_keep_first) {
1562 1562                          /*
1563 1563                           * If this block was part of log chain that couldn't
1564 1564                           * be claimed because a device was missing during
1565 1565                           * zil_claim(), but that device later returns,
1566 1566                           * then this block could erroneously appear valid.
1567 1567                           * To guard against this, assign a new GUID to the new
1568 1568                           * log chain so it doesn't matter what blk points to.
1569 1569                           */
1570 1570                          zil_init_log_chain(zilog, &blk);
1571 1571                          zh->zh_log = blk;
1572 1572                  }
1573 1573          }
1574 1574  
1575 1575          while ((lwb = list_head(&zilog->zl_lwb_list)) != NULL) {
1576 1576                  zh->zh_log = lwb->lwb_blk;
1577 1577                  if (lwb->lwb_buf != NULL || lwb->lwb_max_txg > txg)
1578 1578                          break;
1579 1579                  list_remove(&zilog->zl_lwb_list, lwb);
1580 1580                  zio_free_zil(spa, txg, &lwb->lwb_blk);
1581 1581                  kmem_cache_free(zil_lwb_cache, lwb);
1582 1582  
1583 1583                  /*
1584 1584                   * If we don't have anything left in the lwb list then
1585 1585                   * we've had an allocation failure and we need to zero
1586 1586                   * out the zil_header blkptr so that we don't end
1587 1587                   * up freeing the same block twice.
1588 1588                   */
1589 1589                  if (list_head(&zilog->zl_lwb_list) == NULL)
1590 1590                          BP_ZERO(&zh->zh_log);
1591 1591          }
1592 1592          mutex_exit(&zilog->zl_lock);
1593 1593  }
1594 1594  
1595 1595  void
1596 1596  zil_init(void)
1597 1597  {
1598 1598          zil_lwb_cache = kmem_cache_create("zil_lwb_cache",
1599 1599              sizeof (struct lwb), 0, NULL, NULL, NULL, NULL, NULL, 0);
1600 1600  }
1601 1601  
1602 1602  void
1603 1603  zil_fini(void)
1604 1604  {
1605 1605          kmem_cache_destroy(zil_lwb_cache);
1606 1606  }
1607 1607  
1608 1608  void
1609 1609  zil_set_sync(zilog_t *zilog, uint64_t sync)
1610 1610  {
1611 1611          zilog->zl_sync = sync;
1612 1612  }
1613 1613  
1614 1614  void
1615 1615  zil_set_logbias(zilog_t *zilog, uint64_t logbias)
1616 1616  {
1617 1617          zilog->zl_logbias = logbias;
1618 1618  }
1619 1619  
1620 1620  zilog_t *
1621 1621  zil_alloc(objset_t *os, zil_header_t *zh_phys)
1622 1622  {
1623 1623          zilog_t *zilog;
1624 1624  
1625 1625          zilog = kmem_zalloc(sizeof (zilog_t), KM_SLEEP);
1626 1626  
1627 1627          zilog->zl_header = zh_phys;
1628 1628          zilog->zl_os = os;
1629 1629          zilog->zl_spa = dmu_objset_spa(os);
1630 1630          zilog->zl_dmu_pool = dmu_objset_pool(os);
1631 1631          zilog->zl_destroy_txg = TXG_INITIAL - 1;
1632 1632          zilog->zl_logbias = dmu_objset_logbias(os);
1633 1633          zilog->zl_sync = dmu_objset_syncprop(os);
1634 1634          zilog->zl_next_batch = 1;
1635 1635  
1636 1636          mutex_init(&zilog->zl_lock, NULL, MUTEX_DEFAULT, NULL);
1637 1637  
1638 1638          for (int i = 0; i < TXG_SIZE; i++) {
1639 1639                  mutex_init(&zilog->zl_itxg[i].itxg_lock, NULL,
1640 1640                      MUTEX_DEFAULT, NULL);
1641 1641          }
1642 1642  
1643 1643          list_create(&zilog->zl_lwb_list, sizeof (lwb_t),
1644 1644              offsetof(lwb_t, lwb_node));
1645 1645  
1646 1646          list_create(&zilog->zl_itx_commit_list, sizeof (itx_t),
1647 1647              offsetof(itx_t, itx_node));
1648 1648  
1649 1649          mutex_init(&zilog->zl_vdev_lock, NULL, MUTEX_DEFAULT, NULL);
1650 1650  
1651 1651          avl_create(&zilog->zl_vdev_tree, zil_vdev_compare,
1652 1652              sizeof (zil_vdev_node_t), offsetof(zil_vdev_node_t, zv_node));
1653 1653  
1654 1654          cv_init(&zilog->zl_cv_writer, NULL, CV_DEFAULT, NULL);
1655 1655          cv_init(&zilog->zl_cv_suspend, NULL, CV_DEFAULT, NULL);
1656 1656          cv_init(&zilog->zl_cv_batch[0], NULL, CV_DEFAULT, NULL);
1657 1657          cv_init(&zilog->zl_cv_batch[1], NULL, CV_DEFAULT, NULL);
1658 1658  
1659 1659          return (zilog);
1660 1660  }
1661 1661  
1662 1662  void
1663 1663  zil_free(zilog_t *zilog)
1664 1664  {
1665 1665          zilog->zl_stop_sync = 1;
1666 1666  
1667 1667          ASSERT(list_is_empty(&zilog->zl_lwb_list));
1668 1668          list_destroy(&zilog->zl_lwb_list);
1669 1669  
1670 1670          avl_destroy(&zilog->zl_vdev_tree);
1671 1671          mutex_destroy(&zilog->zl_vdev_lock);
1672 1672  
1673 1673          ASSERT(list_is_empty(&zilog->zl_itx_commit_list));
1674 1674          list_destroy(&zilog->zl_itx_commit_list);
1675 1675  
1676 1676          for (int i = 0; i < TXG_SIZE; i++) {
1677 1677                  /*
1678 1678                   * It's possible for an itx to be generated that doesn't dirty
1679 1679                   * a txg (e.g. ztest TX_TRUNCATE). So there's no zil_clean()
1680 1680                   * callback to remove the entry. We remove those here.
1681 1681                   *
1682 1682                   * Also free up the ziltest itxs.
1683 1683                   */
1684 1684                  if (zilog->zl_itxg[i].itxg_itxs)
1685 1685                          zil_itxg_clean(zilog->zl_itxg[i].itxg_itxs);
1686 1686                  mutex_destroy(&zilog->zl_itxg[i].itxg_lock);
1687 1687          }
1688 1688  
1689 1689          mutex_destroy(&zilog->zl_lock);
1690 1690  
1691 1691          cv_destroy(&zilog->zl_cv_writer);
1692 1692          cv_destroy(&zilog->zl_cv_suspend);
1693 1693          cv_destroy(&zilog->zl_cv_batch[0]);
1694 1694          cv_destroy(&zilog->zl_cv_batch[1]);
1695 1695  
1696 1696          kmem_free(zilog, sizeof (zilog_t));
1697 1697  }
1698 1698  
1699 1699  /*
1700 1700   * Open an intent log.
1701 1701   */
1702 1702  zilog_t *
1703 1703  zil_open(objset_t *os, zil_get_data_t *get_data)
1704 1704  {
1705 1705          zilog_t *zilog = dmu_objset_zil(os);
1706 1706  
1707 1707          ASSERT(zilog->zl_clean_taskq == NULL);
1708 1708          ASSERT(zilog->zl_get_data == NULL);
1709 1709          ASSERT(list_is_empty(&zilog->zl_lwb_list));
1710 1710  
1711 1711          zilog->zl_get_data = get_data;
1712 1712          zilog->zl_clean_taskq = taskq_create("zil_clean", 1, minclsyspri,
1713 1713              2, 2, TASKQ_PREPOPULATE);
1714 1714  
1715 1715          return (zilog);
1716 1716  }
1717 1717  
1718 1718  /*
1719 1719   * Close an intent log.
1720 1720   */
1721 1721  void
1722 1722  zil_close(zilog_t *zilog)
1723 1723  {
1724 1724          lwb_t *lwb;
1725 1725          uint64_t txg = 0;
1726 1726  
1727 1727          zil_commit(zilog, 0); /* commit all itx */
1728 1728  
1729 1729          /*
1730 1730           * The lwb_max_txg for the stubby lwb will reflect the last activity
1731 1731           * for the zil.  After a txg_wait_synced() on the txg we know all the
1732 1732           * callbacks have occurred that may clean the zil.  Only then can we
1733 1733           * destroy the zl_clean_taskq.
1734 1734           */
1735 1735          mutex_enter(&zilog->zl_lock);
1736 1736          lwb = list_tail(&zilog->zl_lwb_list);
1737 1737          if (lwb != NULL)
1738 1738                  txg = lwb->lwb_max_txg;
1739 1739          mutex_exit(&zilog->zl_lock);
1740 1740          if (txg)
1741 1741                  txg_wait_synced(zilog->zl_dmu_pool, txg);
1742 1742  
1743 1743          taskq_destroy(zilog->zl_clean_taskq);
1744 1744          zilog->zl_clean_taskq = NULL;
1745 1745          zilog->zl_get_data = NULL;
1746 1746  
1747 1747          /*
1748 1748           * We should have only one LWB left on the list; remove it now.
1749 1749           */
1750 1750          mutex_enter(&zilog->zl_lock);
1751 1751          lwb = list_head(&zilog->zl_lwb_list);
1752 1752          if (lwb != NULL) {
1753 1753                  ASSERT(lwb == list_tail(&zilog->zl_lwb_list));
1754 1754                  list_remove(&zilog->zl_lwb_list, lwb);
1755 1755                  zio_buf_free(lwb->lwb_buf, lwb->lwb_sz);
1756 1756                  kmem_cache_free(zil_lwb_cache, lwb);
1757 1757          }
1758 1758          mutex_exit(&zilog->zl_lock);
1759 1759  }
1760 1760  
1761 1761  /*
1762 1762   * Suspend an intent log.  While in suspended mode, we still honor
1763 1763   * synchronous semantics, but we rely on txg_wait_synced() to do it.
1764 1764   * We suspend the log briefly when taking a snapshot so that the snapshot
1765 1765   * contains all the data it's supposed to, and has an empty intent log.
1766 1766   */
1767 1767  int
1768 1768  zil_suspend(zilog_t *zilog)
1769 1769  {
1770 1770          const zil_header_t *zh = zilog->zl_header;
1771 1771  
1772 1772          mutex_enter(&zilog->zl_lock);
1773 1773          if (zh->zh_flags & ZIL_REPLAY_NEEDED) {         /* unplayed log */
1774 1774                  mutex_exit(&zilog->zl_lock);
1775 1775                  return (EBUSY);
1776 1776          }
1777 1777          if (zilog->zl_suspend++ != 0) {
1778 1778                  /*
1779 1779                   * Someone else already began a suspend.
1780 1780                   * Just wait for them to finish.
1781 1781                   */
1782 1782                  while (zilog->zl_suspending)
1783 1783                          cv_wait(&zilog->zl_cv_suspend, &zilog->zl_lock);
1784 1784                  mutex_exit(&zilog->zl_lock);
1785 1785                  return (0);
1786 1786          }
1787 1787          zilog->zl_suspending = B_TRUE;
1788 1788          mutex_exit(&zilog->zl_lock);
1789 1789  
1790 1790          zil_commit(zilog, 0);
1791 1791  
1792 1792          zil_destroy(zilog, B_FALSE);
1793 1793  
1794 1794          mutex_enter(&zilog->zl_lock);
1795 1795          zilog->zl_suspending = B_FALSE;
1796 1796          cv_broadcast(&zilog->zl_cv_suspend);
1797 1797          mutex_exit(&zilog->zl_lock);
1798 1798  
1799 1799          return (0);
1800 1800  }
1801 1801  
1802 1802  void
1803 1803  zil_resume(zilog_t *zilog)
1804 1804  {
1805 1805          mutex_enter(&zilog->zl_lock);
1806 1806          ASSERT(zilog->zl_suspend != 0);
1807 1807          zilog->zl_suspend--;
1808 1808          mutex_exit(&zilog->zl_lock);
1809 1809  }
1810 1810  
1811 1811  typedef struct zil_replay_arg {
1812 1812          zil_replay_func_t **zr_replay;
1813 1813          void            *zr_arg;
1814 1814          boolean_t       zr_byteswap;
1815 1815          char            *zr_lr;
1816 1816  } zil_replay_arg_t;
1817 1817  
1818 1818  static int
1819 1819  zil_replay_error(zilog_t *zilog, lr_t *lr, int error)
1820 1820  {
1821 1821          char name[MAXNAMELEN];
1822 1822  
1823 1823          zilog->zl_replaying_seq--;      /* didn't actually replay this one */
1824 1824  
1825 1825          dmu_objset_name(zilog->zl_os, name);
1826 1826  
1827 1827          cmn_err(CE_WARN, "ZFS replay transaction error %d, "
1828 1828              "dataset %s, seq 0x%llx, txtype %llu %s\n", error, name,
1829 1829              (u_longlong_t)lr->lrc_seq,
1830 1830              (u_longlong_t)(lr->lrc_txtype & ~TX_CI),
1831 1831              (lr->lrc_txtype & TX_CI) ? "CI" : "");
1832 1832  
1833 1833          return (error);
1834 1834  }
1835 1835  
1836 1836  static int
1837 1837  zil_replay_log_record(zilog_t *zilog, lr_t *lr, void *zra, uint64_t claim_txg)
1838 1838  {
1839 1839          zil_replay_arg_t *zr = zra;
1840 1840          const zil_header_t *zh = zilog->zl_header;
1841 1841          uint64_t reclen = lr->lrc_reclen;
1842 1842          uint64_t txtype = lr->lrc_txtype;
1843 1843          int error = 0;
1844 1844  
1845 1845          zilog->zl_replaying_seq = lr->lrc_seq;
1846 1846  
1847 1847          if (lr->lrc_seq <= zh->zh_replay_seq)   /* already replayed */
1848 1848                  return (0);
1849 1849  
1850 1850          if (lr->lrc_txg < claim_txg)            /* already committed */
1851 1851                  return (0);
1852 1852  
1853 1853          /* Strip case-insensitive bit, still present in log record */
1854 1854          txtype &= ~TX_CI;
1855 1855  
1856 1856          if (txtype == 0 || txtype >= TX_MAX_TYPE)
1857 1857                  return (zil_replay_error(zilog, lr, EINVAL));
1858 1858  
1859 1859          /*
1860 1860           * If this record type can be logged out of order, the object
1861 1861           * (lr_foid) may no longer exist.  That's legitimate, not an error.
1862 1862           */
1863 1863          if (TX_OOO(txtype)) {
1864 1864                  error = dmu_object_info(zilog->zl_os,
1865 1865                      ((lr_ooo_t *)lr)->lr_foid, NULL);
1866 1866                  if (error == ENOENT || error == EEXIST)
1867 1867                          return (0);
1868 1868          }
1869 1869  
1870 1870          /*
1871 1871           * Make a copy of the data so we can revise and extend it.
1872 1872           */
1873 1873          bcopy(lr, zr->zr_lr, reclen);
1874 1874  
1875 1875          /*
1876 1876           * If this is a TX_WRITE with a blkptr, suck in the data.
1877 1877           */
1878 1878          if (txtype == TX_WRITE && reclen == sizeof (lr_write_t)) {
1879 1879                  error = zil_read_log_data(zilog, (lr_write_t *)lr,
1880 1880                      zr->zr_lr + reclen);
1881 1881                  if (error)
1882 1882                          return (zil_replay_error(zilog, lr, error));
1883 1883          }
1884 1884  
1885 1885          /*
1886 1886           * The log block containing this lr may have been byteswapped
1887 1887           * so that we can easily examine common fields like lrc_txtype.
1888 1888           * However, the log is a mix of different record types, and only the
1889 1889           * replay vectors know how to byteswap their records.  Therefore, if
1890 1890           * the lr was byteswapped, undo it before invoking the replay vector.
1891 1891           */
1892 1892          if (zr->zr_byteswap)
1893 1893                  byteswap_uint64_array(zr->zr_lr, reclen);
1894 1894  
1895 1895          /*
1896 1896           * We must now do two things atomically: replay this log record,
1897 1897           * and update the log header sequence number to reflect the fact that
1898 1898           * we did so. At the end of each replay function the sequence number
1899 1899           * is updated if we are in replay mode.
1900 1900           */
1901 1901          error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, zr->zr_byteswap);
1902 1902          if (error) {
1903 1903                  /*
1904 1904                   * The DMU's dnode layer doesn't see removes until the txg
1905 1905                   * commits, so a subsequent claim can spuriously fail with
1906 1906                   * EEXIST. So if we receive any error we try syncing out
1907 1907                   * any removes then retry the transaction.  Note that we
1908 1908                   * specify B_FALSE for byteswap now, so we don't do it twice.
1909 1909                   */
1910 1910                  txg_wait_synced(spa_get_dsl(zilog->zl_spa), 0);
1911 1911                  error = zr->zr_replay[txtype](zr->zr_arg, zr->zr_lr, B_FALSE);
1912 1912                  if (error)
1913 1913                          return (zil_replay_error(zilog, lr, error));
1914 1914          }
1915 1915          return (0);
1916 1916  }
1917 1917  
1918 1918  /* ARGSUSED */
1919 1919  static int
1920 1920  zil_incr_blks(zilog_t *zilog, blkptr_t *bp, void *arg, uint64_t claim_txg)
1921 1921  {
1922 1922          zilog->zl_replay_blks++;
1923 1923  
1924 1924          return (0);
1925 1925  }
1926 1926  
1927 1927  /*
1928 1928   * If this dataset has a non-empty intent log, replay it and destroy it.
1929 1929   */
1930 1930  void
1931 1931  zil_replay(objset_t *os, void *arg, zil_replay_func_t *replay_func[TX_MAX_TYPE])
1932 1932  {
1933 1933          zilog_t *zilog = dmu_objset_zil(os);
1934 1934          const zil_header_t *zh = zilog->zl_header;
1935 1935          zil_replay_arg_t zr;
1936 1936  
1937 1937          if ((zh->zh_flags & ZIL_REPLAY_NEEDED) == 0) {
1938 1938                  zil_destroy(zilog, B_TRUE);
1939 1939                  return;
1940 1940          }
1941 1941  
1942 1942          zr.zr_replay = replay_func;
1943 1943          zr.zr_arg = arg;
1944 1944          zr.zr_byteswap = BP_SHOULD_BYTESWAP(&zh->zh_log);
1945 1945          zr.zr_lr = kmem_alloc(2 * SPA_MAXBLOCKSIZE, KM_SLEEP);
1946 1946  
1947 1947          /*
1948 1948           * Wait for in-progress removes to sync before starting replay.
1949 1949           */
1950 1950          txg_wait_synced(zilog->zl_dmu_pool, 0);
1951 1951  
1952 1952          zilog->zl_replay = B_TRUE;
1953 1953          zilog->zl_replay_time = ddi_get_lbolt();
1954 1954          ASSERT(zilog->zl_replay_blks == 0);
1955 1955          (void) zil_parse(zilog, zil_incr_blks, zil_replay_log_record, &zr,
1956 1956              zh->zh_claim_txg);
1957 1957          kmem_free(zr.zr_lr, 2 * SPA_MAXBLOCKSIZE);
1958 1958  
1959 1959          zil_destroy(zilog, B_FALSE);
1960 1960          txg_wait_synced(zilog->zl_dmu_pool, zilog->zl_destroy_txg);
1961 1961          zilog->zl_replay = B_FALSE;
1962 1962  }
1963 1963  
1964 1964  boolean_t
1965 1965  zil_replaying(zilog_t *zilog, dmu_tx_t *tx)
1966 1966  {
1967 1967          if (zilog->zl_sync == ZFS_SYNC_DISABLED)
1968 1968                  return (B_TRUE);
1969 1969  
1970 1970          if (zilog->zl_replay) {
1971 1971                  dsl_dataset_dirty(dmu_objset_ds(zilog->zl_os), tx);
1972 1972                  zilog->zl_replayed_seq[dmu_tx_get_txg(tx) & TXG_MASK] =
1973 1973                      zilog->zl_replaying_seq;
1974 1974                  return (B_TRUE);
1975 1975          }
1976 1976  
1977 1977          return (B_FALSE);
1978 1978  }
1979 1979  
1980 1980  /* ARGSUSED */
1981 1981  int
1982 1982  zil_vdev_offline(const char *osname, void *arg)
1983 1983  {
1984 1984          objset_t *os;
1985 1985          zilog_t *zilog;
1986 1986          int error;
1987 1987  
1988 1988          error = dmu_objset_hold(osname, FTAG, &os);
1989 1989          if (error)
1990 1990                  return (error);
1991 1991  
1992 1992          zilog = dmu_objset_zil(os);
1993 1993          if (zil_suspend(zilog) != 0)
1994 1994                  error = EEXIST;
1995 1995          else
1996 1996                  zil_resume(zilog);
1997 1997          dmu_objset_rele(os, FTAG);
1998 1998          return (error);
1999 1999  }
  
    | 
      ↓ open down ↓ | 
    924 lines elided | 
    
      ↑ open up ↑ | 
  
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX