1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 #include <sys/scsi/scsi.h>
  27 #include <sys/ddi.h>
  28 #include <sys/sunddi.h>
  29 #include <sys/thread.h>
  30 #include <sys/var.h>
  31 
  32 #include "sd_xbuf.h"
  33 
  34 /*
  35  * xbuf.c: buf(9s) extension facility.
  36  *
  37  * The buf(9S) extension facility is intended to allow block drivers to
  38  * allocate additional memory that is associated with a particular buf(9S)
  39  * struct.  It is further intended to help in addressing the usual set of
  40  * problems associated with such allocations, in particular those involving
  41  * recovery from allocation failures, especially in code paths that the
  42  * system relies on to free memory.
  43  *
  44  * CAVEAT: Currently this code is completely private to the sd driver and in
  45  * NO WAY constitutes a public or supported interface of any kind. It is
  46  * envisioned that this may one day migrate into the Solaris DDI, but until
  47  * that time this ought to be considered completely unstable and is subject
  48  * to change without notice. This code may NOT in any way be utilized by
  49  * ANY code outside the sd driver.
  50  */
  51 
  52 
  53 static int xbuf_iostart(ddi_xbuf_attr_t xap);
  54 static void xbuf_dispatch(ddi_xbuf_attr_t xap);
  55 static void xbuf_restart_callback(void *arg);
  56 static void xbuf_enqueue(struct buf *bp, ddi_xbuf_attr_t xap);
  57 static int xbuf_brk_done(struct buf *bp);
  58 
  59 
  60 /*
  61  * Note: Should this be exposed to the caller.... do we want to give the
  62  * caller the fexibility of specifying the parameters for the thread pool?
  63  * Note: these values are just estimates at this time, based upon what
  64  * seems reasonable for the sd driver. It may be preferable to make these
  65  * parameters self-scaling in a real (future) implementation.
  66  */
  67 #define XBUF_TQ_MINALLOC        64
  68 #define XBUF_TQ_MAXALLOC        512
  69 #define XBUF_DISPATCH_DELAY     (drv_usectohz(50000))   /* 50 msec */
  70 
  71 static taskq_t *xbuf_tq = NULL;
  72 static int xbuf_attr_tq_minalloc = XBUF_TQ_MINALLOC;
  73 static int xbuf_attr_tq_maxalloc = XBUF_TQ_MAXALLOC;
  74 
  75 static kmutex_t xbuf_mutex = { {NULL} };
  76 static uint32_t xbuf_refcount = 0;
  77 
  78 /*
  79  * Private wrapper for buf cloned via ddi_xbuf_qstrategy()
  80  */
  81 struct xbuf_brk {
  82         kmutex_t mutex;
  83         struct buf *bp0;
  84         uint8_t nbufs;  /* number of buf allocated */
  85         uint8_t active; /* number of active xfer */
  86 
  87         size_t brksize; /* break size used for this buf */
  88         int brkblk;
  89 
  90         /* xfer position */
  91         off_t off;
  92         off_t noff;
  93         daddr_t blkno;
  94 };
  95 
  96 _NOTE(DATA_READABLE_WITHOUT_LOCK(xbuf_brk::off))
  97 
  98 /*
  99  * Hack needed in the prototype so buf breakup will work.
 100  * Here we can rely on the sd code not changing the value in
 101  * b_forw.
 102  */
 103 #define b_clone_private b_forw
 104 
 105 
 106 /* ARGSUSED */
 107 DDII ddi_xbuf_attr_t
 108 ddi_xbuf_attr_create(size_t xsize,
 109         void (*xa_strategy)(struct buf *bp, ddi_xbuf_t xp, void *attr_arg),
 110         void *attr_arg, uint32_t active_limit, uint32_t reserve_limit,
 111         major_t major, int flags)
 112 {
 113         ddi_xbuf_attr_t xap;
 114 
 115         xap = kmem_zalloc(sizeof (struct __ddi_xbuf_attr), KM_SLEEP);
 116 
 117         mutex_init(&xap->xa_mutex, NULL, MUTEX_DRIVER, NULL);
 118         mutex_init(&xap->xa_reserve_mutex, NULL, MUTEX_DRIVER, NULL);
 119 
 120         /* Future: Allow the caller to specify alignment requirements? */
 121         xap->xa_allocsize    = max(xsize, sizeof (void *));
 122         xap->xa_active_limit = active_limit;
 123         xap->xa_active_lowater       = xap->xa_active_limit / 2;
 124         xap->xa_reserve_limit        = reserve_limit;
 125         xap->xa_strategy     = xa_strategy;
 126         xap->xa_attr_arg     = attr_arg;
 127 
 128         mutex_enter(&xbuf_mutex);
 129         if (xbuf_refcount == 0) {
 130                 ASSERT(xbuf_tq == NULL);
 131                 /*
 132                  * Note: Would be nice if: (1) #threads in the taskq pool (set
 133                  * to the value of 'ncpus' at the time the taskq is created)
 134                  * could adjust automatically with DR; (2) the taskq
 135                  * minalloc/maxalloc counts could be grown/shrunk on the fly.
 136                  */
 137                 xbuf_tq = taskq_create("xbuf_taskq", ncpus,
 138                     (v.v_maxsyspri - 2), xbuf_attr_tq_minalloc,
 139                     xbuf_attr_tq_maxalloc, TASKQ_PREPOPULATE);
 140         }
 141         xbuf_refcount++;
 142         mutex_exit(&xbuf_mutex);
 143 
 144         /* In this prototype we just always use the global system pool. */
 145         xap->xa_tq = xbuf_tq;
 146 
 147         return (xap);
 148 }
 149 
 150 
 151 DDII void
 152 ddi_xbuf_attr_destroy(ddi_xbuf_attr_t xap)
 153 {
 154         ddi_xbuf_t      xp;
 155 
 156         mutex_destroy(&xap->xa_mutex);
 157         mutex_destroy(&xap->xa_reserve_mutex);
 158 
 159         /* Free any xbufs on the reserve list */
 160         while (xap->xa_reserve_count != 0) {
 161                 xp = xap->xa_reserve_headp;
 162                 xap->xa_reserve_headp = *((void **)xp);
 163                 xap->xa_reserve_count--;
 164                 kmem_free(xp, xap->xa_allocsize);
 165         }
 166         ASSERT(xap->xa_reserve_headp == NULL);
 167 
 168         mutex_enter(&xbuf_mutex);
 169         ASSERT((xbuf_refcount != 0) && (xbuf_tq != NULL));
 170         xbuf_refcount--;
 171         if (xbuf_refcount == 0) {
 172                 taskq_destroy(xbuf_tq);
 173                 xbuf_tq = NULL;
 174         }
 175         mutex_exit(&xbuf_mutex);
 176 
 177         kmem_free(xap, sizeof (struct __ddi_xbuf_attr));
 178 }
 179 
 180 
 181 /* ARGSUSED */
 182 DDII void
 183 ddi_xbuf_attr_register_devinfo(ddi_xbuf_attr_t xbuf_attr, dev_info_t *dip)
 184 {
 185         /* Currently a no-op in this prototype */
 186 }
 187 
 188 
 189 /* ARGSUSED */
 190 DDII void
 191 ddi_xbuf_attr_unregister_devinfo(ddi_xbuf_attr_t xbuf_attr, dev_info_t *dip)
 192 {
 193         /* Currently a no-op in this prototype */
 194 }
 195 
 196 DDII int
 197 ddi_xbuf_attr_setup_brk(ddi_xbuf_attr_t xap, size_t size)
 198 {
 199         if (size < DEV_BSIZE)
 200                 return (0);
 201 
 202         mutex_enter(&xap->xa_mutex);
 203         xap->xa_brksize = size & ~(DEV_BSIZE - 1);
 204         mutex_exit(&xap->xa_mutex);
 205         return (1);
 206 }
 207 
 208 
 209 
 210 /*
 211  * Enqueue the given buf and attempt to initiate IO.
 212  * Called from the driver strategy(9E) routine.
 213  */
 214 
 215 DDII int
 216 ddi_xbuf_qstrategy(struct buf *bp, ddi_xbuf_attr_t xap)
 217 {
 218         ASSERT(xap != NULL);
 219         ASSERT(!mutex_owned(&xap->xa_mutex));
 220         ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
 221 
 222         mutex_enter(&xap->xa_mutex);
 223 
 224         ASSERT((bp->b_bcount & (DEV_BSIZE - 1)) == 0);
 225 
 226         /*
 227          * Breakup buf if necessary. bp->b_private is temporarily
 228          * used to save xbuf_brk
 229          */
 230         if (xap->xa_brksize && bp->b_bcount > xap->xa_brksize) {
 231                 struct xbuf_brk *brkp;
 232 
 233                 brkp = kmem_zalloc(sizeof (struct xbuf_brk), KM_SLEEP);
 234                 _NOTE(NOW_INVISIBLE_TO_OTHER_THREADS(*brkp))
 235                 mutex_init(&brkp->mutex, NULL, MUTEX_DRIVER, NULL);
 236                 brkp->bp0 = bp;
 237                 brkp->brksize = xap->xa_brksize;
 238                 brkp->brkblk = btodt(xap->xa_brksize);
 239                 brkp->noff = xap->xa_brksize;
 240                 brkp->blkno = bp->b_blkno;
 241                 _NOTE(NOW_VISIBLE_TO_OTHER_THREADS(*brkp))
 242                 bp->b_private = brkp;
 243         } else {
 244                 bp->b_private = NULL;
 245         }
 246 
 247         /* Enqueue buf */
 248         if (xap->xa_headp == NULL) {
 249                 xap->xa_headp = xap->xa_tailp = bp;
 250         } else {
 251                 xap->xa_tailp->av_forw = bp;
 252                 xap->xa_tailp = bp;
 253         }
 254         bp->av_forw = NULL;
 255 
 256         xap->xa_pending++;
 257         mutex_exit(&xap->xa_mutex);
 258         return (xbuf_iostart(xap));
 259 }
 260 
 261 
 262 /*
 263  * Drivers call this immediately before calling biodone(9F), to notify the
 264  * framework that the indicated xbuf is no longer being used by the driver.
 265  * May be called under interrupt context.
 266  */
 267 
 268 DDII int
 269 ddi_xbuf_done(struct buf *bp, ddi_xbuf_attr_t xap)
 270 {
 271         ddi_xbuf_t xp;
 272         int done;
 273 
 274         ASSERT(bp != NULL);
 275         ASSERT(xap != NULL);
 276         ASSERT(!mutex_owned(&xap->xa_mutex));
 277         ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
 278 
 279         xp = ddi_xbuf_get(bp, xap);
 280 
 281         mutex_enter(&xap->xa_mutex);
 282 
 283 #ifdef  SDDEBUG
 284         if (xap->xa_active_limit != 0) {
 285                 ASSERT(xap->xa_active_count > 0);
 286         }
 287 #endif
 288         xap->xa_active_count--;
 289 
 290         if (xap->xa_reserve_limit != 0) {
 291                 mutex_enter(&xap->xa_reserve_mutex);
 292                 if (xap->xa_reserve_count < xap->xa_reserve_limit) {
 293                         /* Put this xbuf onto the reserve list & exit */
 294                         *((void **)xp) = xap->xa_reserve_headp;
 295                         xap->xa_reserve_headp = xp;
 296                         xap->xa_reserve_count++;
 297                         mutex_exit(&xap->xa_reserve_mutex);
 298                         goto done;
 299                 }
 300                 mutex_exit(&xap->xa_reserve_mutex);
 301         }
 302 
 303         kmem_free(xp, xap->xa_allocsize);    /* return it to the system */
 304 
 305 done:
 306         if (bp->b_iodone == xbuf_brk_done) {
 307                 struct xbuf_brk *brkp = (struct xbuf_brk *)bp->b_clone_private;
 308 
 309                 brkp->active--;
 310                 if (brkp->active || xap->xa_headp == brkp->bp0) {
 311                         done = 0;
 312                 } else {
 313                         brkp->off = -1;      /* mark bp0 as completed */
 314                         done = 1;
 315                 }
 316         } else {
 317                 done = 1;
 318         }
 319 
 320         if ((xap->xa_active_limit == 0) ||
 321             (xap->xa_active_count <= xap->xa_active_lowater)) {
 322                 xbuf_dispatch(xap);
 323         }
 324 
 325         mutex_exit(&xap->xa_mutex);
 326         return (done);
 327 }
 328 
 329 static int
 330 xbuf_brk_done(struct buf *bp)
 331 {
 332         struct xbuf_brk *brkp = (struct xbuf_brk *)bp->b_clone_private;
 333         struct buf *bp0 = brkp->bp0;
 334         int done;
 335 
 336         mutex_enter(&brkp->mutex);
 337         if (bp->b_flags & B_ERROR && !(bp0->b_flags & B_ERROR)) {
 338                 bp0->b_flags |= B_ERROR;
 339                 bp0->b_error = bp->b_error;
 340         }
 341         if (bp->b_resid)
 342                 bp0->b_resid = bp0->b_bcount;
 343 
 344         freerbuf(bp);
 345         brkp->nbufs--;
 346 
 347         done = (brkp->off == -1 && brkp->nbufs == 0);
 348         mutex_exit(&brkp->mutex);
 349 
 350         /* All buf segments done */
 351         if (done) {
 352                 mutex_destroy(&brkp->mutex);
 353                 kmem_free(brkp, sizeof (struct xbuf_brk));
 354                 biodone(bp0);
 355         }
 356         return (0);
 357 }
 358 
 359 DDII void
 360 ddi_xbuf_dispatch(ddi_xbuf_attr_t xap)
 361 {
 362         mutex_enter(&xap->xa_mutex);
 363         if ((xap->xa_active_limit == 0) ||
 364             (xap->xa_active_count <= xap->xa_active_lowater)) {
 365                 xbuf_dispatch(xap);
 366         }
 367         mutex_exit(&xap->xa_mutex);
 368 }
 369 
 370 
 371 /*
 372  * ISSUE: in this prototype we cannot really implement ddi_xbuf_get()
 373  * unless we explicitly hide the xbuf pointer somewhere in the buf
 374  * during allocation, and then rely on the driver never changing it.
 375  * We can probably get away with using b_private for this for now,
 376  * tho it really is kinda gnarly.....
 377  */
 378 
 379 /* ARGSUSED */
 380 DDII ddi_xbuf_t
 381 ddi_xbuf_get(struct buf *bp, ddi_xbuf_attr_t xap)
 382 {
 383         return (bp->b_private);
 384 }
 385 
 386 
 387 /*
 388  * Initiate IOs for bufs on the queue.  Called from kernel thread or taskq
 389  * thread context. May execute concurrently for the same ddi_xbuf_attr_t.
 390  */
 391 
 392 static int
 393 xbuf_iostart(ddi_xbuf_attr_t xap)
 394 {
 395         struct buf *bp;
 396         ddi_xbuf_t xp;
 397 
 398         ASSERT(xap != NULL);
 399         ASSERT(!mutex_owned(&xap->xa_mutex));
 400         ASSERT(!mutex_owned(&xap->xa_reserve_mutex));
 401 
 402         /*
 403          * For each request on the queue, attempt to allocate the specified
 404          * xbuf extension area, and call the driver's iostart() routine.
 405          * We process as many requests on the queue as we can, until either
 406          * (1) we run out of requests; or
 407          * (2) we run out of resources; or
 408          * (3) we reach the maximum limit for the given ddi_xbuf_attr_t.
 409          */
 410         for (;;) {
 411                 mutex_enter(&xap->xa_mutex);
 412 
 413                 if ((bp = xap->xa_headp) == NULL) {
 414                         break;  /* queue empty */
 415                 }
 416 
 417                 if ((xap->xa_active_limit != 0) &&
 418                     (xap->xa_active_count >= xap->xa_active_limit)) {
 419                         break;  /* allocation limit reached */
 420                 }
 421 
 422                 /*
 423                  * If the reserve_limit is non-zero then work with the
 424                  * reserve else always allocate a new struct.
 425                  */
 426                 if (xap->xa_reserve_limit != 0) {
 427                         /*
 428                          * Don't penalize EVERY I/O by always allocating a new
 429                          * struct. for the sake of maintaining and not touching
 430                          * a reserve for a pathalogical condition that may never
 431                          * happen. Use the reserve entries first, this uses it
 432                          * like a local pool rather than a reserve that goes
 433                          * untouched. Make sure it's re-populated whenever it
 434                          * gets fully depleted just in case it really is needed.
 435                          * This is safe because under the pathalogical
 436                          * condition, when the system runs out of memory such
 437                          * that the below allocs fail, the reserve will still
 438                          * be available whether the entries are saved away on
 439                          * the queue unused or in-transport somewhere. Thus
 440                          * progress can still continue, however slowly.
 441                          */
 442                         mutex_enter(&xap->xa_reserve_mutex);
 443                         if (xap->xa_reserve_count != 0) {
 444                                 ASSERT(xap->xa_reserve_headp != NULL);
 445                                 /* Grab an xbuf from the reserve */
 446                                 xp = xap->xa_reserve_headp;
 447                                 xap->xa_reserve_headp = *((void **)xp);
 448                                 ASSERT(xap->xa_reserve_count > 0);
 449                                 xap->xa_reserve_count--;
 450                         } else {
 451                                 /*
 452                                  * Either this is the first time through,
 453                                  * or the reserve has been totally depleted.
 454                                  * Re-populate the reserve (pool). Excess
 455                                  * structs. get released in the done path.
 456                                  */
 457                                 while (xap->xa_reserve_count <
 458                                     xap->xa_reserve_limit) {
 459                                         xp = kmem_alloc(xap->xa_allocsize,
 460                                             KM_NOSLEEP);
 461                                         if (xp == NULL) {
 462                                                 break;
 463                                         }
 464                                         *((void **)xp) = xap->xa_reserve_headp;
 465                                         xap->xa_reserve_headp = xp;
 466                                         xap->xa_reserve_count++;
 467                                 }
 468                                 /* And one more to use right now. */
 469                                 xp = kmem_alloc(xap->xa_allocsize, KM_NOSLEEP);
 470                         }
 471                         mutex_exit(&xap->xa_reserve_mutex);
 472                 } else {
 473                         /*
 474                          * Try to alloc a new xbuf struct. If this fails just
 475                          * exit for now. We'll get back here again either upon
 476                          * cmd completion or via the timer handler.
 477                          * Question: what if the allocation attempt for the very
 478                          * first cmd. fails? There are no outstanding cmds so
 479                          * how do we get back here?
 480                          * Should look at un_ncmds_in_transport, if it's zero
 481                          * then schedule xbuf_restart_callback via the timer.
 482                          * Athough that breaks the architecture by bringing
 483                          * softstate data into this code.
 484                          */
 485                         xp = kmem_alloc(xap->xa_allocsize, KM_NOSLEEP);
 486                 }
 487                 if (xp == NULL) {
 488                         break; /* Can't process a cmd. right now. */
 489                 }
 490 
 491                 /*
 492                  * Always run the counter. It's used/needed when xa_active_limit
 493                  * is non-zero which is the typical (and right now only) case.
 494                  */
 495                 xap->xa_active_count++;
 496 
 497                 if (bp->b_private) {
 498                         struct xbuf_brk *brkp = bp->b_private;
 499                         struct buf *bp0 = bp;
 500 
 501                         brkp->active++;
 502 
 503                         mutex_enter(&brkp->mutex);
 504                         brkp->nbufs++;
 505                         mutex_exit(&brkp->mutex);
 506 
 507                         if (brkp->noff < bp0->b_bcount) {
 508                                 bp = bioclone(bp0, brkp->off, brkp->brksize,
 509                                     bp0->b_edev, brkp->blkno, xbuf_brk_done,
 510                                     NULL, KM_SLEEP);
 511 
 512                                 /* update xfer position */
 513                                 brkp->off = brkp->noff;
 514                                 brkp->noff += brkp->brksize;
 515                                 brkp->blkno += brkp->brkblk;
 516                         } else {
 517                                 bp = bioclone(bp0, brkp->off,
 518                                     bp0->b_bcount - brkp->off, bp0->b_edev,
 519                                     brkp->blkno, xbuf_brk_done, NULL, KM_SLEEP);
 520 
 521                                 /* unlink the buf from the list */
 522                                 xap->xa_headp = bp0->av_forw;
 523                                 bp0->av_forw = NULL;
 524                         }
 525                         bp->b_clone_private = (struct buf *)brkp;
 526                 } else {
 527                         /* unlink the buf from the list */
 528                         xap->xa_headp = bp->av_forw;
 529                         bp->av_forw = NULL;
 530                 }
 531 
 532                 /*
 533                  * Hack needed in the prototype so ddi_xbuf_get() will work.
 534                  * Here we can rely on the sd code not changing the value in
 535                  * b_private (in fact it wants it there). See ddi_get_xbuf()
 536                  */
 537                 bp->b_private = xp;
 538 
 539                 /* call the driver's iostart routine */
 540                 mutex_exit(&xap->xa_mutex);
 541                 (*(xap->xa_strategy))(bp, xp, xap->xa_attr_arg);
 542         }
 543 
 544         ASSERT(xap->xa_pending > 0);
 545         xap->xa_pending--;
 546         mutex_exit(&xap->xa_mutex);
 547         return (0);
 548 }
 549 
 550 
 551 /*
 552  * Re-start IO processing if there is anything on the queue, AND if the
 553  * restart function is not already running/pending for this ddi_xbuf_attr_t
 554  */
 555 static void
 556 xbuf_dispatch(ddi_xbuf_attr_t xap)
 557 {
 558         ASSERT(xap != NULL);
 559         ASSERT(xap->xa_tq != NULL);
 560         ASSERT(mutex_owned(&xap->xa_mutex));
 561 
 562         if ((xap->xa_headp != NULL) && (xap->xa_timeid == NULL) &&
 563             (xap->xa_pending == 0)) {
 564                 /*
 565                  * First try to see if we can dispatch the restart function
 566                  * immediately, in a taskq thread.  If this fails, then
 567                  * schedule a timeout(9F) callback to try again later.
 568                  */
 569                 if (taskq_dispatch(xap->xa_tq,
 570                     (void (*)(void *)) xbuf_iostart, xap, KM_NOSLEEP) == 0) {
 571                         /*
 572                          * Unable to enqueue the request for the taskq thread,
 573                          * try again later.  Note that this will keep re-trying
 574                          * until taskq_dispatch() succeeds.
 575                          */
 576                         xap->xa_timeid = timeout(xbuf_restart_callback, xap,
 577                             XBUF_DISPATCH_DELAY);
 578                 } else {
 579                         /*
 580                          * This indicates that xbuf_iostart() will soon be
 581                          * run for this ddi_xbuf_attr_t, and we do not need to
 582                          * schedule another invocation via timeout/taskq
 583                          */
 584                         xap->xa_pending++;
 585                 }
 586         }
 587 }
 588 
 589 /* timeout(9F) callback routine for xbuf restart mechanism. */
 590 static void
 591 xbuf_restart_callback(void *arg)
 592 {
 593         ddi_xbuf_attr_t xap = arg;
 594 
 595         ASSERT(xap != NULL);
 596         ASSERT(xap->xa_tq != NULL);
 597         ASSERT(!mutex_owned(&xap->xa_mutex));
 598 
 599         mutex_enter(&xap->xa_mutex);
 600         xap->xa_timeid = NULL;
 601         xbuf_dispatch(xap);
 602         mutex_exit(&xap->xa_mutex);
 603 }
 604 
 605 
 606 DDII void
 607 ddi_xbuf_flushq(ddi_xbuf_attr_t xap, int (*funcp)(struct buf *))
 608 {
 609         struct buf *bp;
 610         struct buf *next_bp;
 611         struct buf *prev_bp = NULL;
 612 
 613         ASSERT(xap != NULL);
 614         ASSERT(xap->xa_tq != NULL);
 615         ASSERT(!mutex_owned(&xap->xa_mutex));
 616 
 617         mutex_enter(&xap->xa_mutex);
 618 
 619         for (bp = xap->xa_headp; bp != NULL; bp = next_bp) {
 620 
 621                 next_bp = bp->av_forw;       /* Save for next iteration */
 622 
 623                 /*
 624                  * If the user-supplied function is non-NULL and returns
 625                  * FALSE, then just leave the current bp on the queue.
 626                  */
 627                 if ((funcp != NULL) && (!(*funcp)(bp))) {
 628                         prev_bp = bp;
 629                         continue;
 630                 }
 631 
 632                 /* de-queue the bp */
 633                 if (bp == xap->xa_headp) {
 634                         xap->xa_headp = next_bp;
 635                         if (xap->xa_headp == NULL) {
 636                                 xap->xa_tailp = NULL;
 637                         }
 638                 } else {
 639                         ASSERT(xap->xa_headp != NULL);
 640                         ASSERT(prev_bp != NULL);
 641                         if (bp == xap->xa_tailp) {
 642                                 ASSERT(next_bp == NULL);
 643                                 xap->xa_tailp = prev_bp;
 644                         }
 645                         prev_bp->av_forw = next_bp;
 646                 }
 647                 bp->av_forw = NULL;
 648 
 649                 /* Add the bp to the flush queue */
 650                 if (xap->xa_flush_headp == NULL) {
 651                         ASSERT(xap->xa_flush_tailp == NULL);
 652                         xap->xa_flush_headp = xap->xa_flush_tailp = bp;
 653                 } else {
 654                         ASSERT(xap->xa_flush_tailp != NULL);
 655                         xap->xa_flush_tailp->av_forw = bp;
 656                         xap->xa_flush_tailp = bp;
 657                 }
 658         }
 659 
 660         while ((bp = xap->xa_flush_headp) != NULL) {
 661                 xap->xa_flush_headp = bp->av_forw;
 662                 if (xap->xa_flush_headp == NULL) {
 663                         xap->xa_flush_tailp = NULL;
 664                 }
 665                 mutex_exit(&xap->xa_mutex);
 666                 bioerror(bp, EIO);
 667                 bp->b_resid = bp->b_bcount;
 668                 biodone(bp);
 669                 mutex_enter(&xap->xa_mutex);
 670         }
 671 
 672         mutex_exit(&xap->xa_mutex);
 673 }