1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 #include <sys/types.h>
  27 #include <sys/ksynch.h>
  28 #include <sys/cmn_err.h>
  29 #include <sys/kmem.h>
  30 #include <sys/stat.h>
  31 #include <sys/errno.h>
  32 
  33 #include "../solaris/nsc_thread.h"
  34 #ifdef DS_DDICT
  35 #include "../contract.h"
  36 #endif
  37 #include <sys/nsctl/nsctl.h>
  38 
  39 #include <sys/kmem.h>
  40 #include <sys/ddi.h>
  41 
  42 #include <sys/sdt.h>              /* dtrace is S10 or later */
  43 
  44 #include "rdc_io.h"
  45 #include "rdc_bitmap.h"
  46 #include "rdc_diskq.h"
  47 #include "rdc_clnt.h"
  48 
  49 #include <sys/unistat/spcs_s.h>
  50 #include <sys/unistat/spcs_s_k.h>
  51 #include <sys/unistat/spcs_errors.h>
  52 
  53 extern nsc_io_t *_rdc_io_hc;
  54 
  55 int rdc_diskq_coalesce = 0;
  56 
  57 int
  58 _rdc_rsrv_diskq(rdc_group_t *group)
  59 {
  60         int rc = 0;
  61 
  62         mutex_enter(&group->diskqmutex);
  63         if (group->diskqfd == NULL) {
  64                 mutex_exit(&group->diskqmutex);
  65                 return (EIO);
  66         } else if ((group->diskqrsrv == 0) &&
  67             (rc = nsc_reserve(group->diskqfd, 0)) != 0) {
  68                 cmn_err(CE_WARN,
  69                     "!rdc: nsc_reserve(%s) failed %d\n",
  70                     nsc_pathname(group->diskqfd), rc);
  71         } else {
  72                 group->diskqrsrv++;
  73         }
  74 
  75         mutex_exit(&group->diskqmutex);
  76         return (rc);
  77 }
  78 
  79 void
  80 _rdc_rlse_diskq(rdc_group_t *group)
  81 {
  82         mutex_enter(&group->diskqmutex);
  83         if (group->diskqrsrv > 0 && --group->diskqrsrv == 0) {
  84                 nsc_release(group->diskqfd);
  85         }
  86         mutex_exit(&group->diskqmutex);
  87 }
  88 
  89 void
  90 rdc_wait_qbusy(disk_queue *q)
  91 {
  92         ASSERT(MUTEX_HELD(QLOCK(q)));
  93         while (q->busycnt > 0)
  94                 cv_wait(&q->busycv, QLOCK(q));
  95 }
  96 
  97 void
  98 rdc_set_qbusy(disk_queue *q)
  99 {
 100         ASSERT(MUTEX_HELD(QLOCK(q)));
 101         q->busycnt++;
 102 }
 103 
 104 void
 105 rdc_clr_qbusy(disk_queue *q)
 106 {
 107         ASSERT(MUTEX_HELD(QLOCK(q)));
 108         q->busycnt--;
 109         if (q->busycnt == 0)
 110                 cv_broadcast(&q->busycv);
 111 }
 112 
 113 int
 114 rdc_lookup_diskq(char *pathname)
 115 {
 116         rdc_u_info_t *urdc;
 117 #ifdef DEBUG
 118         rdc_k_info_t *krdc;
 119 #endif
 120         int index;
 121 
 122         for (index = 0; index < rdc_max_sets; index++) {
 123                 urdc = &rdc_u_info[index];
 124 #ifdef DEBUG
 125                 krdc = &rdc_k_info[index];
 126 #endif
 127                 ASSERT(krdc->index == index);
 128                 ASSERT(urdc->index == index);
 129                 if (!IS_ENABLED(urdc))
 130                         continue;
 131 
 132                 if (strncmp(pathname, urdc->disk_queue,
 133                     NSC_MAXPATH) == 0)
 134                         return (index);
 135         }
 136 
 137         return (-1);
 138 }
 139 
 140 void
 141 rdc_unintercept_diskq(rdc_group_t *grp)
 142 {
 143         if (!RDC_IS_DISKQ(grp))
 144                 return;
 145         if (grp->q_tok)
 146                 (void) nsc_unregister_path(grp->q_tok, 0);
 147         grp->q_tok = NULL;
 148 }
 149 
 150 void
 151 rdc_close_diskq(rdc_group_t *grp)
 152 {
 153 
 154         if (grp == NULL) {
 155 #ifdef DEBUG
 156                 cmn_err(CE_WARN, "!rdc_close_diskq: NULL group!");
 157 #endif
 158                 return;
 159         }
 160 
 161         if (grp->diskqfd) {
 162                 if (nsc_close(grp->diskqfd) != 0) {
 163 #ifdef DEBUG
 164                         cmn_err(CE_WARN, "!nsc_close on diskq failed");
 165 #else
 166                         ;
 167                         /*EMPTY*/
 168 #endif
 169                 }
 170                 grp->diskqfd = 0;
 171                 grp->diskqrsrv = 0;
 172         }
 173         bzero(&grp->diskq.disk_hdr, sizeof (diskq_header));
 174 }
 175 
 176 /*
 177  * nsc_open the diskq and attach
 178  * the nsc_fd to krdc->diskqfd
 179  */
 180 int
 181 rdc_open_diskq(rdc_k_info_t *krdc)
 182 {
 183         rdc_u_info_t *urdc;
 184         rdc_group_t *grp;
 185         int sts;
 186         nsc_size_t size;
 187         char *diskqname;
 188         int mutexheld = 0;
 189 
 190         grp = krdc->group;
 191         urdc = &rdc_u_info[krdc->index];
 192 
 193         mutex_enter(&grp->diskqmutex);
 194         mutexheld++;
 195         if (urdc->disk_queue[0] == '\0') {
 196                 goto fail;
 197         }
 198 
 199         diskqname = &urdc->disk_queue[0];
 200 
 201         if (grp->diskqfd == NULL) {
 202                 grp->diskqfd = nsc_open(diskqname,
 203                     NSC_RDCHR_ID|NSC_DEVICE|NSC_WRITE, 0, 0, 0);
 204                 if (grp->diskqfd == NULL) {
 205                         cmn_err(CE_WARN, "!rdc_open_diskq: Unable to open %s",
 206                             diskqname);
 207                         goto fail;
 208                 }
 209         }
 210         if (!grp->q_tok)
 211                 grp->q_tok = nsc_register_path(urdc->disk_queue,
 212                     NSC_DEVICE | NSC_CACHE, _rdc_io_hc);
 213 
 214         grp->diskqrsrv = 0; /* init reserve count */
 215 
 216         mutex_exit(&grp->diskqmutex);
 217         mutexheld--;
 218         /* just test a reserve release */
 219         sts = _rdc_rsrv_diskq(grp);
 220         if (!RDC_SUCCESS(sts)) {
 221                 cmn_err(CE_WARN, "!rdc_open_diskq: Reserve failed for %s",
 222                     diskqname);
 223                 goto fail;
 224         }
 225         sts = nsc_partsize(grp->diskqfd, &size);
 226         _rdc_rlse_diskq(grp);
 227 
 228         if ((sts == 0) && (size < 1)) {
 229                 rdc_unintercept_diskq(grp);
 230                 rdc_close_diskq(grp);
 231                 goto fail;
 232         }
 233 
 234         return (0);
 235 
 236 fail:
 237         bzero(&urdc->disk_queue, NSC_MAXPATH);
 238         if (mutexheld)
 239                 mutex_exit(&grp->diskqmutex);
 240         return (-1);
 241 
 242 }
 243 
 244 /*
 245  * rdc_count_vecs
 246  * simply vec++'s until sb_addr is null
 247  * returns number of vectors encountered
 248  */
 249 int
 250 rdc_count_vecs(nsc_vec_t *vec)
 251 {
 252         nsc_vec_t       *vecp;
 253         int i = 0;
 254         vecp = vec;
 255         while (vecp->sv_addr) {
 256                 vecp++;
 257                 i++;
 258         }
 259         return (i+1);
 260 }
 261 /*
 262  * rdc_setid2idx
 263  * given setid, return index
 264  */
 265 int
 266 rdc_setid2idx(int setid)
 267 {
 268 
 269         int index = 0;
 270 
 271         for (index = 0; index < rdc_max_sets; index++) {
 272                 if (rdc_u_info[index].setid == setid)
 273                         break;
 274         }
 275         if (index >= rdc_max_sets)
 276                 index = -1;
 277         return (index);
 278 }
 279 
 280 /*
 281  * rdc_idx2setid
 282  * given an index, return its setid
 283  */
 284 int
 285 rdc_idx2setid(int index)
 286 {
 287         return (rdc_u_info[index].setid);
 288 }
 289 
 290 /*
 291  * rdc_fill_ioheader
 292  * fill in all the stuff you want to save on disk
 293  * at the beginnig of each queued write
 294  */
 295 void
 296 rdc_fill_ioheader(rdc_aio_t *aio, io_hdr *hd, int qpos)
 297 {
 298         ASSERT(MUTEX_HELD(&rdc_k_info[aio->index].group->diskq.disk_qlock));
 299 
 300         hd->dat.magic = RDC_IOHDR_MAGIC;
 301         hd->dat.type = RDC_QUEUEIO;
 302         hd->dat.pos = aio->pos;
 303         hd->dat.hpos = aio->pos;
 304         hd->dat.qpos = qpos;
 305         hd->dat.len = aio->len;
 306         hd->dat.flag = aio->flag;
 307         hd->dat.iostatus = aio->iostatus;
 308         hd->dat.setid = rdc_idx2setid(aio->index);
 309         hd->dat.time = nsc_time();
 310         if (!aio->handle)
 311                 hd->dat.flag |= RDC_NULL_BUF; /* no real data to queue */
 312 }
 313 
 314 /*
 315  * rdc_dump_iohdrs
 316  * give back the iohdr list
 317  * and clear out q->lastio
 318  */
 319 void
 320 rdc_dump_iohdrs(disk_queue *q)
 321 {
 322         io_hdr *p, *r;
 323 
 324         ASSERT(MUTEX_HELD(QLOCK(q)));
 325 
 326         p = q->iohdrs;
 327         while (p) {
 328                 r = p->dat.next;
 329                 kmem_free(p, sizeof (*p));
 330                 q->hdrcnt--;
 331                 p = r;
 332         }
 333         q->iohdrs = q->hdr_last = NULL;
 334         q->hdrcnt = 0;
 335         if (q->lastio->handle)
 336                 (void) nsc_free_buf(q->lastio->handle);
 337         bzero(&(*q->lastio), sizeof (*q->lastio));
 338 }
 339 
 340 /*
 341  * rdc_fail_diskq
 342  * set flags, throw away q info
 343  * clean up what you can
 344  * wait for flusher threads to stop (taking into account this may be one)
 345  * takes group_lock, so conf, many, and bitmap may not be held
 346  */
 347 void
 348 rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag)
 349 {
 350         rdc_k_info_t *p;
 351         rdc_u_info_t *q = &rdc_u_info[krdc->index];
 352         rdc_group_t *group = krdc->group;
 353         disk_queue *dq = &krdc->group->diskq;
 354 
 355         if (IS_STATE(q, RDC_DISKQ_FAILED))
 356                 return;
 357 
 358         if (!(flag & RDC_NOFAIL))
 359                 cmn_err(CE_WARN, "!disk queue %s failure", q->disk_queue);
 360 
 361         if (flag & RDC_DOLOG) {
 362                 rdc_group_enter(krdc);
 363                 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
 364                     "disk queue failed");
 365                 rdc_group_exit(krdc);
 366         }
 367         mutex_enter(QHEADLOCK(dq));
 368         mutex_enter(QLOCK(dq));
 369         /*
 370          * quick stop of the flushers
 371          * other cleanup is done on the un-failing of the diskq
 372          */
 373         SET_QHEAD(dq, RDC_DISKQ_DATA_OFF);
 374         SET_QTAIL(dq, RDC_DISKQ_DATA_OFF);
 375         SET_QNXTIO(dq, RDC_DISKQ_DATA_OFF);
 376         SET_LASTQTAIL(dq, 0);
 377 
 378         rdc_dump_iohdrs(dq);
 379 
 380         mutex_exit(QLOCK(dq));
 381         mutex_exit(QHEADLOCK(dq));
 382 
 383         bzero(krdc->bitmap_ref, krdc->bitmap_size * BITS_IN_BYTE *
 384             BMAP_REF_PREF_SIZE);
 385 
 386         if (flag & RDC_DOLOG) /* otherwise, we already have the conf lock */
 387                 rdc_group_enter(krdc);
 388 
 389         else if (!(flag & RDC_GROUP_LOCKED))
 390                 ASSERT(MUTEX_HELD(&rdc_conf_lock));
 391 
 392         if (!(flag & RDC_NOFAIL)) {
 393                 rdc_set_flags(q, RDC_DISKQ_FAILED);
 394         }
 395         rdc_clr_flags(q, RDC_QUEUING);
 396 
 397         for (p = krdc->group_next; p != krdc; p = p->group_next) {
 398                 q = &rdc_u_info[p->index];
 399                 if (!IS_ENABLED(q))
 400                         continue;
 401                 if (!(flag & RDC_NOFAIL)) {
 402                         rdc_set_flags(q, RDC_DISKQ_FAILED);
 403                 }
 404                 rdc_clr_flags(q, RDC_QUEUING);
 405                 bzero(p->bitmap_ref, p->bitmap_size * BITS_IN_BYTE *
 406                     BMAP_REF_PREF_SIZE);
 407                 /* RDC_QUEUING is cleared in group_log() */
 408         }
 409 
 410         if (flag & RDC_DOLOG)
 411                 rdc_group_exit(krdc);
 412 
 413         /* can't wait for myself to go away, I'm a flusher */
 414         if (wait & RDC_WAIT)
 415                 while (group->rdc_thrnum)
 416                         delay(2);
 417 
 418 }
 419 
 420 /*
 421  * rdc_stamp_diskq
 422  * write out diskq header info
 423  * must have disk_qlock held
 424  * if rsrvd flag is 0, the nsc_reserve is done
 425  */
 426 int
 427 rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int failflags)
 428 {
 429         nsc_vec_t       vec[2];
 430         nsc_buf_t       *head = NULL;
 431         rdc_group_t     *grp;
 432         rdc_u_info_t    *urdc;
 433         disk_queue      *q;
 434         int             rc, flags;
 435 
 436         grp = krdc->group;
 437         q = &krdc->group->diskq;
 438 
 439         ASSERT(MUTEX_HELD(&q->disk_qlock));
 440 
 441         urdc = &rdc_u_info[krdc->index];
 442 
 443         if (!rsrvd && _rdc_rsrv_diskq(grp)) {
 444                 cmn_err(CE_WARN, "!rdc_stamp_diskq: %s reserve failed",
 445                     urdc->disk_queue);
 446                 mutex_exit(QLOCK(q));
 447                 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
 448                 mutex_enter(QLOCK(q));
 449                 return (-1);
 450         }
 451         flags = NSC_WRITE | NSC_NOCACHE | NSC_NODATA;
 452         rc = nsc_alloc_buf(grp->diskqfd, 0, 1, flags, &head);
 453 
 454         if (!RDC_SUCCESS(rc)) {
 455                 cmn_err(CE_WARN, "!Alloc buf failed for disk queue %s",
 456                     &urdc->disk_queue[0]);
 457                 mutex_exit(QLOCK(q));
 458                 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
 459                 mutex_enter(QLOCK(q));
 460                 return (-1);
 461         }
 462         vec[0].sv_len = FBA_SIZE(1);
 463         vec[0].sv_addr = (uchar_t *)&q->disk_hdr;
 464         vec[1].sv_len = 0;
 465         vec[1].sv_addr = NULL;
 466 
 467         head->sb_vec = &vec[0];
 468 
 469 #ifdef DEBUG_DISKQ
 470         cmn_err(CE_NOTE, "!rdc_stamp_diskq: hdr: %p magic: %x state: "
 471             "%x head: %d tail: %d size: %d nitems: %d blocks: %d",
 472             q, QMAGIC(q), QSTATE(q), QHEAD(q),
 473             QTAIL(q), QSIZE(q), QNITEMS(q), QBLOCKS(q));
 474 #endif
 475 
 476         rc = nsc_write(head, 0, 1, 0);
 477 
 478         if (!RDC_SUCCESS(rc)) {
 479                 if (!rsrvd)
 480                         _rdc_rlse_diskq(grp);
 481                 cmn_err(CE_CONT, "!disk queue %s failed rc %d",
 482                     &urdc->disk_queue[0], rc);
 483                 mutex_exit(QLOCK(q));
 484                 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
 485                 mutex_enter(QLOCK(q));
 486                 return (-1);
 487         }
 488 
 489         (void) nsc_free_buf(head);
 490         if (!rsrvd)
 491                 _rdc_rlse_diskq(grp);
 492 
 493         return (0);
 494 }
 495 
 496 /*
 497  * rdc_init_diskq_header
 498  * load initial values into the header
 499  */
 500 void
 501 rdc_init_diskq_header(rdc_group_t *grp, dqheader *header)
 502 {
 503         int rc;
 504         int type = 0;
 505         disk_queue *q = &grp->diskq;
 506 
 507         ASSERT(MUTEX_HELD(QLOCK(q)));
 508 
 509         /* save q type if this is a failure */
 510         if (QSTATE(q) & RDC_QNOBLOCK)
 511                 type = RDC_QNOBLOCK;
 512         bzero(header, sizeof (*header));
 513         header->h.magic = RDC_DISKQ_MAGIC;
 514         header->h.vers = RDC_DISKQ_VERS;
 515         header->h.state |= (RDC_SHUTDOWN_BAD|type); /* SHUTDOWN_OK on suspend */
 516         header->h.head_offset = RDC_DISKQ_DATA_OFF;
 517         header->h.tail_offset = RDC_DISKQ_DATA_OFF;
 518         header->h.nitems = 0;
 519         header->h.blocks = 0;
 520         header->h.qwrap = 0;
 521         SET_QNXTIO(q, QHEAD(q));
 522         SET_QCOALBOUNDS(q, RDC_DISKQ_DATA_OFF);
 523 
 524         /* do this last, as this might be a failure. get the kernel state ok */
 525         rc = _rdc_rsrv_diskq(grp);
 526         if (!RDC_SUCCESS(rc)) {
 527                 cmn_err(CE_WARN, "!init_diskq_hdr: Reserve failed for queue");
 528                 return;
 529         }
 530         (void) nsc_partsize(grp->diskqfd, &header->h.disk_size);
 531         _rdc_rlse_diskq(grp);
 532 
 533 }
 534 
 535 /*
 536  * rdc_unfail_diskq
 537  * the diskq failed for some reason, lets try and re-start it
 538  * the old stuff has already been thrown away
 539  * should just be called from rdc_sync
 540  */
 541 void
 542 rdc_unfail_diskq(rdc_k_info_t *krdc)
 543 {
 544         rdc_k_info_t *p;
 545         rdc_u_info_t *q = &rdc_u_info[krdc->index];
 546         rdc_group_t *group = krdc->group;
 547         disk_queue *dq = &group->diskq;
 548 
 549         rdc_group_enter(krdc);
 550         rdc_clr_flags(q, RDC_ASYNC);
 551         /* someone else won the race... */
 552         if (!IS_STATE(q, RDC_DISKQ_FAILED)) {
 553                 rdc_group_exit(krdc);
 554                 return;
 555         }
 556         rdc_clr_flags(q, RDC_DISKQ_FAILED);
 557         for (p = krdc->group_next; p != krdc; p = p->group_next) {
 558                 q = &rdc_u_info[p->index];
 559                 if (!IS_ENABLED(q))
 560                         continue;
 561                 rdc_clr_flags(q, RDC_DISKQ_FAILED);
 562                 rdc_clr_flags(q, RDC_ASYNC);
 563                 if (IS_STATE(q, RDC_QUEUING))
 564                         rdc_clr_flags(q, RDC_QUEUING);
 565         }
 566         rdc_group_exit(krdc);
 567 
 568         mutex_enter(QLOCK(dq));
 569 
 570         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 571         /* real i/o to the queue */
 572         /* clear RDC_AUXSYNCIP because we cannot halt a sync that's not here */
 573         krdc->aux_state &= ~RDC_AUXSYNCIP;
 574         if (rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED | RDC_DOLOG) < 0) {
 575                 mutex_exit(QLOCK(dq));
 576                 goto fail;
 577         }
 578 
 579         SET_QNXTIO(dq, QHEAD(dq));
 580         SET_QHDRCNT(dq, 0);
 581         SET_QSTATE(dq, RDC_SHUTDOWN_BAD); /* only suspend can write good */
 582         dq->iohdrs = NULL;
 583         dq->hdr_last = NULL;
 584 
 585         /* should be none, but.. */
 586         rdc_dump_iohdrs(dq);
 587 
 588         mutex_exit(QLOCK(dq));
 589 
 590 
 591 fail:
 592         krdc->aux_state |= RDC_AUXSYNCIP;
 593         return;
 594 
 595 }
 596 
 597 int
 598 rdc_read_diskq_header(rdc_k_info_t *krdc)
 599 {
 600         int rc;
 601         diskq_header *header;
 602         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 603 
 604         if (krdc->group->diskqfd == NULL) {
 605                 char buf[NSC_MAXPATH];
 606                 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
 607                     &urdc->secondary.intf[0]);
 608                 cmn_err(CE_WARN, "!Disk Queue Header read failed for %s",
 609                     urdc->group_name[0] == '\0' ? buf:
 610                     &urdc->group_name[0]);
 611                 return (-1);
 612         }
 613 
 614         header = &krdc->group->diskq.disk_hdr.h;
 615         if (_rdc_rsrv_diskq(krdc->group)) {
 616                 return (-1);
 617         }
 618 
 619         rc = rdc_ns_io(krdc->group->diskqfd, NSC_RDBUF, 0,
 620             (uchar_t *)header, sizeof (diskq_header));
 621 
 622         _rdc_rlse_diskq(krdc->group);
 623 
 624         if (!RDC_SUCCESS(rc)) {
 625                 char buf[NSC_MAXPATH];
 626                 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
 627                     &urdc->secondary.file[0]);
 628                 cmn_err(CE_WARN, "!Disk Queue Header read failed(%d) for %s",
 629                     rc, urdc->group_name[0] == '\0' ? buf :
 630                     &urdc->group_name[0]);
 631                 return (-1);
 632         }
 633         return (0);
 634 }
 635 
 636 /*
 637  * rdc_stop_diskq_flusher
 638  */
 639 void
 640 rdc_stop_diskq_flusher(rdc_k_info_t *krdc)
 641 {
 642         disk_queue q, *qp;
 643         rdc_group_t *group;
 644 #ifdef DEBUG
 645         cmn_err(CE_NOTE, "!stopping flusher threads");
 646 #endif
 647         group = krdc->group;
 648         qp = &krdc->group->diskq;
 649 
 650         /* save the queue info */
 651         q = *qp;
 652 
 653         /* lie a little */
 654         SET_QTAIL(qp, RDC_DISKQ_DATA_OFF);
 655         SET_QHEAD(qp, RDC_DISKQ_DATA_OFF);
 656         SET_QSTATE(qp, RDC_QDISABLEPEND);
 657         SET_QSTATE(qp, RDC_STOPPINGFLUSH);
 658 
 659         /* drop locks to allow flushers to die */
 660         mutex_exit(QLOCK(qp));
 661         mutex_exit(QHEADLOCK(qp));
 662         rdc_group_exit(krdc);
 663 
 664         while (group->rdc_thrnum)
 665                 delay(2);
 666 
 667         rdc_group_enter(krdc);
 668         mutex_enter(QHEADLOCK(qp));
 669         mutex_enter(QLOCK(qp));
 670 
 671         CLR_QSTATE(qp, RDC_STOPPINGFLUSH);
 672         *qp = q;
 673 }
 674 
 675 /*
 676  * rdc_enable_diskq
 677  * open the diskq
 678  * and stamp the header onto it.
 679  */
 680 int
 681 rdc_enable_diskq(rdc_k_info_t *krdc)
 682 {
 683         rdc_group_t *group;
 684         disk_queue *q;
 685 
 686         group = krdc->group;
 687         q = &group->diskq;
 688 
 689         if (rdc_open_diskq(krdc) < 0)
 690                 goto fail;
 691 
 692         mutex_enter(QLOCK(q));
 693         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 694 
 695         if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
 696                 mutex_exit(QLOCK(q));
 697                 goto fail;
 698         }
 699 
 700         SET_QNXTIO(q, QHEAD(q));
 701 
 702         mutex_exit(QLOCK(q));
 703         return (0);
 704 
 705 fail:
 706         mutex_enter(&group->diskqmutex);
 707         rdc_close_diskq(group);
 708         mutex_exit(&group->diskqmutex);
 709 
 710         /* caller has to fail diskq after dropping conf & many locks */
 711         return (RDC_EQNOADD);
 712 }
 713 
 714 /*
 715  * rdc_resume_diskq
 716  * open the diskq and read the header
 717  */
 718 int
 719 rdc_resume_diskq(rdc_k_info_t *krdc)
 720 {
 721         rdc_u_info_t *urdc;
 722         rdc_group_t *group;
 723         disk_queue *q;
 724         int rc = 0;
 725 
 726         urdc = &rdc_u_info[krdc->index];
 727         group = krdc->group;
 728         q = &group->diskq;
 729 
 730         if (rdc_open_diskq(krdc) < 0) {
 731                 rc = RDC_EQNOADD;
 732                 goto fail;
 733         }
 734 
 735         mutex_enter(QLOCK(q));
 736 
 737         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 738 
 739         if (rdc_read_diskq_header(krdc) < 0) {
 740                 SET_QSTATE(q, RDC_QBADRESUME);
 741                 rc = RDC_EQNOADD;
 742         }
 743 
 744         /* check diskq magic number */
 745         if (QMAGIC(q) != RDC_DISKQ_MAGIC) {
 746                 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
 747                     " incorrect magic number in header", urdc->disk_queue);
 748                 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 749                 SET_QSTATE(q, RDC_QBADRESUME);
 750                 rc = RDC_EQNOADD;
 751         } else switch (QVERS(q)) {
 752                 diskq_header1 h1;       /* version 1 header */
 753                 diskq_header *hc;       /* current header */
 754 
 755 #ifdef  NSC_MULTI_TERABYTE
 756                 case RDC_DISKQ_VER_ORIG:
 757                         /* version 1 diskq header, upgrade to 64bit version */
 758                 h1 = *(diskq_header1 *)(&group->diskq.disk_hdr.h);
 759                 hc = &group->diskq.disk_hdr.h;
 760 
 761                 cmn_err(CE_WARN, "!SNDR: old version header for diskq %s,"
 762                     " upgrading to current version", urdc->disk_queue);
 763                 hc->vers = RDC_DISKQ_VERS;
 764                 hc->state = h1.state;
 765                 hc->head_offset = h1.head_offset;
 766                 hc->tail_offset = h1.tail_offset;
 767                 hc->disk_size = h1.disk_size;
 768                 hc->nitems = h1.nitems;
 769                 hc->blocks = h1.blocks;
 770                 hc->qwrap = h1.qwrap;
 771                 hc->auxqwrap = h1.auxqwrap;
 772                 hc->seq_last = h1.seq_last;
 773                 hc->ack_last = h1.ack_last;
 774 
 775                 if (hc->nitems > 0) {
 776                         cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
 777                             " old version Q contains data", urdc->disk_queue);
 778                         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 779                         SET_QSTATE(q, RDC_QBADRESUME);
 780                         rc = RDC_EQNOADD;
 781                 }
 782                 break;
 783 #else
 784                 case RDC_DISKQ_VER_64BIT:
 785                         cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
 786                             " diskq header newer than current version",
 787                             urdc->disk_queue);
 788                         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 789                         SET_QSTATE(q, RDC_QBADRESUME);
 790                         rc = RDC_EQNOADD;
 791                 break;
 792 #endif
 793                 case RDC_DISKQ_VERS:
 794                         /* okay, current version diskq */
 795                 break;
 796                 default:
 797                         cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
 798                             " unknown diskq header version", urdc->disk_queue);
 799                         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 800                         SET_QSTATE(q, RDC_QBADRESUME);
 801                         rc = RDC_EQNOADD;
 802                 break;
 803         }
 804         if (IS_QSTATE(q, RDC_SHUTDOWN_BAD)) {
 805                 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
 806                     " unsafe shutdown", urdc->disk_queue);
 807                 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
 808                 SET_QSTATE(q, RDC_QBADRESUME);
 809                 rc = RDC_EQNOADD;
 810         }
 811 
 812         CLR_QSTATE(q, RDC_SHUTDOWN_OK);
 813         SET_QSTATE(q, RDC_SHUTDOWN_BAD);
 814 
 815         /* bad, until proven not bad */
 816         if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
 817                 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_NOLOG);
 818                 rc = RDC_EQNOADD;
 819         }
 820 
 821         SET_QNXTIO(q, QHEAD(q));
 822         group->diskq.nitems_hwm = QNITEMS(q);
 823         group->diskq.blocks_hwm = QBLOCKS(q);
 824 
 825         mutex_exit(QLOCK(q));
 826 
 827 #ifdef DEBUG
 828         cmn_err(CE_NOTE, "!rdc_resume_diskq: resuming diskq %s \n",
 829             urdc->disk_queue);
 830         cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
 831 #endif
 832         if (rc == 0)
 833                 return (0);
 834 
 835 fail:
 836 
 837         /* caller has to set the diskq failed after dropping it's locks */
 838         return (rc);
 839 
 840 }
 841 
 842 int
 843 rdc_suspend_diskq(rdc_k_info_t *krdc)
 844 {
 845         int rc;
 846         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 847         disk_queue *q;
 848 
 849         q = &krdc->group->diskq;
 850 
 851         /* grab both diskq locks as we are going to kill the flusher */
 852         mutex_enter(QHEADLOCK(q));
 853         mutex_enter(QLOCK(q));
 854 
 855         if ((krdc->group->rdc_thrnum) && (!IS_QSTATE(q, RDC_STOPPINGFLUSH))) {
 856                 SET_QSTATE(q, RDC_STOPPINGFLUSH);
 857                 rdc_stop_diskq_flusher(krdc);
 858                 CLR_QSTATE(q, RDC_STOPPINGFLUSH);
 859         }
 860 
 861         krdc->group->diskq.disk_hdr.h.state &= ~RDC_SHUTDOWN_BAD;
 862         krdc->group->diskq.disk_hdr.h.state |= RDC_SHUTDOWN_OK;
 863         krdc->group->diskq.disk_hdr.h.state &= ~RDC_QBADRESUME;
 864 
 865         /* let's make sure that the flusher has stopped.. */
 866         if (krdc->group->rdc_thrnum) {
 867                 mutex_exit(QLOCK(q));
 868                 mutex_exit(QHEADLOCK(q));
 869                 rdc_group_exit(krdc);
 870 
 871                 while (krdc->group->rdc_thrnum)
 872                         delay(5);
 873 
 874                 rdc_group_enter(krdc);
 875                 mutex_enter(QLOCK(q));
 876                 mutex_enter(QHEADLOCK(q));
 877         }
 878         /* write refcount to the bitmap */
 879         if ((rc = rdc_write_refcount(krdc)) < 0) {
 880                 rdc_group_exit(krdc);
 881                 goto fail;
 882         }
 883 
 884         if (!QEMPTY(q)) {
 885                 rdc_set_flags(urdc, RDC_QUEUING);
 886         } else {
 887                 rdc_clr_flags(urdc, RDC_QUEUING);
 888         }
 889 
 890         /* fill in diskq header info */
 891         krdc->group->diskq.disk_hdr.h.state &= ~RDC_QDISABLEPEND;
 892 
 893 #ifdef DEBUG
 894         cmn_err(CE_NOTE, "!suspending disk queue\n" QDISPLAY(q));
 895 #endif
 896 
 897         /* to avoid a possible deadlock, release in order, and reacquire */
 898         mutex_exit(QLOCK(q));
 899         mutex_exit(QHEADLOCK(q));
 900 
 901         if (krdc->group->count > 1) {
 902                 rdc_group_exit(krdc);
 903                 goto fail; /* just stamp on the last suspend */
 904         }
 905         rdc_group_exit(krdc); /* in case this stamp fails */
 906         mutex_enter(QLOCK(q));
 907 
 908         rc = rdc_stamp_diskq(krdc, 0, RDC_NOLOG);
 909 
 910         mutex_exit(QLOCK(q));
 911 
 912 fail:
 913         rdc_group_enter(krdc);
 914 
 915         /* diskq already failed if stamp failed */
 916 
 917         return (rc);
 918 }
 919 
 920 /*
 921  * copy orig aio to copy, including the nsc_buf_t
 922  */
 923 int
 924 rdc_dup_aio(rdc_aio_t *orig, rdc_aio_t *copy)
 925 {
 926         int rc;
 927         bcopy(orig, copy, sizeof (*orig));
 928         copy->handle = NULL;
 929 
 930         if (orig->handle == NULL) /* no buf to alloc/copy */
 931                 return (0);
 932 
 933         rc = nsc_alloc_abuf(orig->pos, orig->len, 0, &copy->handle);
 934         if (!RDC_SUCCESS(rc)) {
 935 #ifdef DEBUG
 936                 cmn_err(CE_WARN, "!rdc_dup_aio: alloc_buf failed (%d)", rc);
 937 #endif
 938                 return (rc);
 939         }
 940         rc = nsc_copy(orig->handle, copy->handle, orig->pos,
 941             orig->pos, orig->len);
 942         if (!RDC_SUCCESS(rc)) {
 943                 (void) nsc_free_buf(copy->handle);
 944 #ifdef DEBUG
 945                 cmn_err(CE_WARN, "!rdc_dup_aio: copy buf failed (%d)", rc);
 946 #endif
 947                 return (rc);
 948         }
 949         return (0);
 950 }
 951 
 952 /*
 953  * rdc_qfill_shldwakeup()
 954  * 0 if the memory queue has filled, and the low water
 955  * mark has not been reached. 0 if diskq is empty.
 956  * 1 if less than low water mark
 957  * net_queue mutex is already held
 958  */
 959 int
 960 rdc_qfill_shldwakeup(rdc_k_info_t *krdc)
 961 {
 962         rdc_group_t *group = krdc->group;
 963         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 964         net_queue *nq = &group->ra_queue;
 965         disk_queue *dq = &group->diskq;
 966 
 967         ASSERT(MUTEX_HELD(&nq->net_qlock));
 968 
 969         if (!RDC_IS_DISKQ(krdc->group))
 970                 return (0);
 971 
 972         if (nq->qfill_sleeping != RDC_QFILL_ASLEEP)
 973                 return (0);
 974 
 975         if (nq->qfflags & RDC_QFILLSTOP)
 976                 return (1);
 977 
 978         if (nq->qfflags & RDC_QFILLSLEEP)
 979                 return (0);
 980 
 981         if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING))
 982                 return (0);
 983 
 984         mutex_enter(QLOCK(dq));
 985         if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
 986                 mutex_exit(QLOCK(dq));
 987                 return (0);
 988         }
 989         mutex_exit(QLOCK(dq));
 990 
 991         if (nq->qfill_sleeping == RDC_QFILL_ASLEEP) {
 992                 if (nq->hwmhit) {
 993                         if (nq->blocks <= RDC_LOW_QBLOCKS) {
 994                                 nq->hwmhit = 0;
 995                         } else {
 996                                 return (0);
 997                         }
 998                 }
 999 #ifdef DEBUG_DISKQ_NOISY
1000                 cmn_err(CE_NOTE, "!Waking up diskq->memq flusher, flags 0x%x"
1001                     " idx: %d", rdc_get_vflags(urdc), urdc->index);
1002 #endif
1003                 return (1);
1004         }
1005         return (0);
1006 
1007 }
1008 
1009 /*
1010  * rdc_diskq_enqueue
1011  * enqueue one i/o to the diskq
1012  * after appending some metadata to the front
1013  */
1014 int
1015 rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1016 {
1017         nsc_vec_t       *vec = NULL;
1018         nsc_buf_t       *bp = NULL;
1019         nsc_buf_t       *qbuf = NULL;
1020         io_hdr          *iohdr = NULL;
1021         disk_queue      *q;
1022         rdc_group_t     *group;
1023         int             numvecs;
1024         int             i, j, rc = 0;
1025         int             retries = 0;
1026         rdc_u_info_t    *urdc;
1027         nsc_size_t      iofbas; /* len of io + io header len */
1028         int             qtail;
1029         int             delay_time = 2;
1030         int             print_msg = 1;
1031 
1032 #ifdef DEBUG_WRITER_UBERNOISE
1033         int             qhead;
1034 #endif
1035         urdc = &rdc_u_info[krdc->index];
1036         group = krdc->group;
1037         q = &group->diskq;
1038 
1039         mutex_enter(QLOCK(q));
1040 
1041         /*
1042          * there is a thread that is blocking because the queue is full,
1043          * don't try to set up this write until all is clear
1044          * check before and after for logging or failed queue just
1045          * in case a thread was in flight while the queue was full,
1046          * and in the proccess of failing
1047          */
1048         while (IS_QSTATE(q, RDC_QFULL)) {
1049                 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1050                     (IS_STATE(urdc, RDC_LOGGING) &&
1051                     !IS_STATE(urdc, RDC_QUEUING))) {
1052                         mutex_exit(QLOCK(q));
1053                         if (aio->handle)
1054                                 (void) nsc_free_buf(aio->handle);
1055                         return (-1);
1056                 }
1057                 cv_wait(&q->qfullcv, QLOCK(q));
1058 
1059                 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1060                     (IS_STATE(urdc, RDC_LOGGING) &&
1061                     !IS_STATE(urdc, RDC_QUEUING))) {
1062                         mutex_exit(QLOCK(q));
1063                         if (aio->handle)
1064                                 (void) nsc_free_buf(aio->handle);
1065                         return (-1);
1066                 }
1067 
1068         }
1069 
1070         SET_QSTATE(q, QTAILBUSY);
1071 
1072         if (aio->handle == NULL) {
1073                 /* we're only going to write the header to the queue */
1074                 numvecs = 2; /* kmem_alloc io header + null terminate */
1075                 iofbas = FBA_LEN(sizeof (io_hdr));
1076 
1077         } else {
1078                 /* find out how many vecs */
1079                 numvecs = rdc_count_vecs(aio->handle->sb_vec) + 1;
1080                 iofbas = aio->len + FBA_LEN(sizeof (io_hdr));
1081         }
1082 
1083         /*
1084          * this, in conjunction with QTAILBUSY, will prevent
1085          * premature dequeuing
1086          */
1087 
1088         SET_LASTQTAIL(q, QTAIL(q));
1089 
1090         iohdr = (io_hdr *) kmem_zalloc(sizeof (io_hdr), KM_NOSLEEP);
1091         vec = (nsc_vec_t *) kmem_zalloc(sizeof (nsc_vec_t) * numvecs,
1092             KM_NOSLEEP);
1093 
1094         if (!vec || !iohdr) {
1095                 if (!vec) {
1096                         cmn_err(CE_WARN, "!vec kmem alloc failed");
1097                 } else {
1098                         cmn_err(CE_WARN, "!iohdr kmem alloc failed");
1099                 }
1100                 if (vec)
1101                         kmem_free(vec, sizeof (*vec));
1102                 if (iohdr)
1103                         kmem_free(iohdr, sizeof (*iohdr));
1104                 CLR_QSTATE(q, QTAILBUSY);
1105                 SET_LASTQTAIL(q, 0);
1106                 mutex_exit(QLOCK(q));
1107                 if (aio->handle)
1108                         (void) nsc_free_buf(aio->handle);
1109                 return (ENOMEM);
1110         }
1111 
1112         vec[numvecs - 1].sv_len = 0;
1113         vec[numvecs - 1].sv_addr = 0;
1114 
1115         /* now add the write itself */
1116         bp = aio->handle;
1117 
1118         for (i = 1, j = 0; bp && bp->sb_vec[j].sv_addr &&
1119             i < numvecs; i++, j++) {
1120                 vec[i].sv_len = bp->sb_vec[j].sv_len;
1121                 vec[i].sv_addr = bp->sb_vec[j].sv_addr;
1122         }
1123 
1124 retry:
1125 
1126         /* check for queue wrap, then check for overflow */
1127         if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1128             (IS_STATE(urdc, RDC_LOGGING) && !IS_STATE(urdc, RDC_QUEUING))) {
1129                 kmem_free(iohdr, sizeof (*iohdr));
1130                 kmem_free(vec, sizeof (*vec) * numvecs);
1131                 CLR_QSTATE(q, QTAILBUSY);
1132                 SET_LASTQTAIL(q, 0);
1133                 if (IS_QSTATE(q, RDC_QFULL)) { /* wakeup blocked threads */
1134                         CLR_QSTATE(q, RDC_QFULL);
1135                         cv_broadcast(&q->qfullcv);
1136                 }
1137                 mutex_exit(QLOCK(q));
1138                 if (aio->handle)
1139                         (void) nsc_free_buf(aio->handle);
1140 
1141                 return (-1);
1142         }
1143 
1144         if (QTAILSHLDWRAP(q, iofbas)) {
1145                 /*
1146                  * just go back to the beginning of the disk
1147                  * it's not worth the trouble breaking up the write
1148                  */
1149 #ifdef DEBUG_DISKQWRAP
1150                 cmn_err(CE_NOTE, "!wrapping Q tail: " QDISPLAY(q));
1151 #endif
1152                 /*LINTED*/
1153                 WRAPQTAIL(q);
1154         }
1155 
1156         /*
1157          * prepend the write's metadata
1158          */
1159         rdc_fill_ioheader(aio, iohdr, QTAIL(q));
1160 
1161         vec[0].sv_len = FBA_SIZE(1);
1162         vec[0].sv_addr = (uchar_t *)iohdr;
1163 
1164         /* check for tail < head */
1165 
1166         if (!(FITSONQ(q, iofbas))) {
1167                 /*
1168                  * don't allow any more writes to start
1169                  */
1170                 SET_QSTATE(q, RDC_QFULL);
1171                 mutex_exit(QLOCK(q));
1172 
1173                 if ((!group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1174                         (void) rdc_writer(krdc->index);
1175 
1176                 delay(delay_time);
1177                 q->throttle_delay += delay_time;
1178                 retries++;
1179                 delay_time *= 2; /* fairly aggressive */
1180                 if ((retries >= 8) || (delay_time >= 256)) {
1181                         delay_time = 2;
1182                         if (print_msg) {
1183                                 cmn_err(CE_WARN, "!enqueue: disk queue %s full",
1184                                     &urdc->disk_queue[0]);
1185                                 print_msg = 0;
1186 #ifdef DEBUG
1187                                 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
1188 #else
1189                                 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1190 #endif
1191                         }
1192                         /*
1193                          * if this is a no-block queue, or this is a blocking
1194                          * queue that is not flushing. reset and log
1195                          */
1196                         if ((QSTATE(q) & RDC_QNOBLOCK) ||
1197                             (IS_STATE(urdc, RDC_QUEUING))) {
1198 
1199                                 if (IS_STATE(urdc, RDC_QUEUING)) {
1200                 cmn_err(CE_WARN, "!SNDR: disk queue %s full and not flushing. "
1201                     "giving up", &urdc->disk_queue[0]);
1202                 cmn_err(CE_WARN, "!SNDR: %s:%s entering logging mode",
1203                     urdc->secondary.intf, urdc->secondary.file);
1204                                 }
1205 
1206                                 rdc_fail_diskq(krdc, RDC_WAIT,
1207                                     RDC_DOLOG | RDC_NOFAIL);
1208                                 kmem_free(iohdr, sizeof (*iohdr));
1209                                 kmem_free(vec, sizeof (*vec) * numvecs);
1210                                 mutex_enter(QLOCK(q));
1211                                 CLR_QSTATE(q, QTAILBUSY | RDC_QFULL);
1212                                 cv_broadcast(&q->qfullcv);
1213                                 mutex_exit(QLOCK(q));
1214                                 SET_LASTQTAIL(q, 0);
1215                                 if (aio->handle)
1216                                         (void) nsc_free_buf(aio->handle);
1217                                 return (ENOMEM);
1218                         }
1219                 }
1220 
1221                 mutex_enter(QLOCK(q));
1222                 goto retry;
1223 
1224         }
1225 
1226         qtail = QTAIL(q);
1227 #ifdef DEBUG_WRITER_UBERNOISE
1228         qhead = QHEAD(q);
1229 #endif
1230 
1231         /* update tail pointer, nitems on queue and blocks on queue */
1232         INC_QTAIL(q, iofbas); /* increment tail over i/o size + ioheader size */
1233         INC_QNITEMS(q, 1);
1234         /* increment counter for i/o blocks only */
1235         INC_QBLOCKS(q, (iofbas - FBA_LEN(sizeof (io_hdr))));
1236 
1237         if (QNITEMS(q) > q->nitems_hwm)
1238                 q->nitems_hwm = QNITEMS(q);
1239         if (QBLOCKS(q) > q->blocks_hwm)
1240                 q->blocks_hwm = QBLOCKS(q);
1241 
1242         if (IS_QSTATE(q, RDC_QFULL)) {
1243                 CLR_QSTATE(q, RDC_QFULL);
1244                 cv_broadcast(&q->qfullcv);
1245         }
1246 
1247         mutex_exit(QLOCK(q));
1248 
1249         /*
1250          * if (krdc->io_kstats) {
1251          *      mutex_enter(krdc->io_kstats->ks_lock);
1252          *      kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1253          *      mutex_exit(krdc->io_kstats->ks_lock);
1254          * }
1255          */
1256 
1257         DTRACE_PROBE(rdc_diskq_rsrv);
1258 
1259         if (_rdc_rsrv_diskq(group)) {
1260                 cmn_err(CE_WARN, "!rdc_enqueue: %s reserve failed",
1261                     &urdc->disk_queue[0]);
1262                 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1263                 kmem_free(iohdr, sizeof (*iohdr));
1264                 kmem_free(vec, sizeof (*vec) * numvecs);
1265                 mutex_enter(QLOCK(q));
1266                 CLR_QSTATE(q, QTAILBUSY);
1267                 SET_LASTQTAIL(q, 0);
1268                 mutex_exit(QLOCK(q));
1269                 if (aio->handle)
1270                         (void) nsc_free_buf(aio->handle);
1271                 return (-1);
1272         }
1273 
1274 /* XXX for now do this, but later pre-alloc handle in enable/resume */
1275 
1276         DTRACE_PROBE(rdc_diskq_alloc_start);
1277         rc = nsc_alloc_buf(group->diskqfd, qtail, iofbas,
1278             NSC_NOCACHE | NSC_WRITE | NSC_NODATA, &qbuf);
1279 
1280         DTRACE_PROBE(rdc_diskq_alloc_end);
1281 
1282         if (!RDC_SUCCESS(rc)) {
1283                 cmn_err(CE_WARN, "!disk queue %s alloc failed(%d) %" NSC_SZFMT,
1284                     &urdc->disk_queue[0], rc, iofbas);
1285                 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1286                 rc = ENOMEM;
1287                 goto fail;
1288         }
1289         /* move vec and write to queue */
1290         qbuf->sb_vec = &vec[0];
1291 
1292 #ifdef DEBUG_WRITER_UBERNOISE
1293 
1294         cmn_err(CE_NOTE, "!about to write to queue, qbuf: %p, qhead: %d, "
1295             "qtail: %d, len: %d contents: %c%c%c%c%c",
1296             (void *) qbuf, qhead, qtail, iofbas,
1297             qbuf->sb_vec[1].sv_addr[0],
1298             qbuf->sb_vec[1].sv_addr[1],
1299             qbuf->sb_vec[1].sv_addr[2],
1300             qbuf->sb_vec[1].sv_addr[3],
1301             qbuf->sb_vec[1].sv_addr[4]);
1302         cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1303 
1304 #endif
1305 
1306         DTRACE_PROBE2(rdc_diskq_nswrite_start, int, qtail, nsc_size_t, iofbas);
1307         rc = nsc_write(qbuf, qtail, iofbas, 0);
1308         DTRACE_PROBE2(rdc_diskq_nswrite_end, int, qtail, nsc_size_t, iofbas);
1309 
1310         if (!RDC_SUCCESS(rc)) {
1311                 cmn_err(CE_WARN, "!disk queue %s write failed %d",
1312                     &urdc->disk_queue[0], rc);
1313                 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1314                 goto fail;
1315 
1316         }
1317 
1318         mutex_enter(QLOCK(q));
1319 
1320         SET_LASTQTAIL(q, 0);
1321         CLR_QSTATE(q, QTAILBUSY);
1322 
1323         mutex_exit(QLOCK(q));
1324 
1325 fail:
1326 
1327         /*
1328          * return what should be returned
1329          * the aio is returned in _rdc_write after status is gathered.
1330          */
1331 
1332         if (qbuf)
1333                 qbuf->sb_vec = 0;
1334         (void) nsc_free_buf(qbuf);
1335 
1336         if (aio->handle)
1337                 (void) nsc_free_buf(aio->handle);
1338 
1339         _rdc_rlse_diskq(group);
1340         DTRACE_PROBE(rdc_diskq_rlse);
1341 
1342         /* free the iohdr and the vecs */
1343 
1344         if (iohdr)
1345                 kmem_free(iohdr, sizeof (*iohdr));
1346         if (vec)
1347                 kmem_free(vec, sizeof (*vec) * numvecs);
1348 
1349         /* if no flusher running, start one */
1350         if ((!krdc->group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1351                 (void) rdc_writer(krdc->index);
1352 
1353         return (rc);
1354 }
1355 
1356 /*
1357  * place this on the pending list of io_hdr's out for flushing
1358  */
1359 void
1360 rdc_add_iohdr(io_hdr *header, rdc_group_t *group)
1361 {
1362         disk_queue *q = NULL;
1363 #ifdef DEBUG
1364         io_hdr *p;
1365 #endif
1366 
1367         q = &group->diskq;
1368 
1369         /* paranoia */
1370         header->dat.next = NULL;
1371 
1372         mutex_enter(QLOCK(q));
1373 #ifdef DEBUG /* AAAH! double flush!? */
1374         p = q->iohdrs;
1375         while (p) {
1376                 if (p->dat.qpos == header->dat.qpos) {
1377                         cmn_err(CE_WARN, "!ADDING DUPLICATE HEADER %" NSC_SZFMT,
1378                             p->dat.qpos);
1379                         kmem_free(header, sizeof (*header));
1380                         mutex_exit(QLOCK(q));
1381                         return;
1382                 }
1383                 p = p->dat.next;
1384         }
1385 #endif
1386         if (q->iohdrs == NULL) {
1387                 q->iohdrs = q->hdr_last = header;
1388                 q->hdrcnt = 1;
1389                 mutex_exit(QLOCK(q));
1390                 return;
1391         }
1392 
1393         q->hdr_last->dat.next = header;
1394         q->hdr_last = header;
1395         q->hdrcnt++;
1396         mutex_exit(QLOCK(q));
1397         return;
1398 
1399 }
1400 
1401 /*
1402  * mark an io header as flushed. If it is the qhead,
1403  * then update the qpointers
1404  * free the io_hdrs
1405  * called after the bitmap is cleared by flusher
1406  */
1407 void
1408 rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_size_t qpos)
1409 {
1410         rdc_group_t *group = krdc->group;
1411         disk_queue *q = NULL;
1412         io_hdr  *hp = NULL;
1413         io_hdr  *p = NULL;
1414         int found = 0;
1415         int cnt = 0;
1416 
1417 #ifndef NSC_MULTI_TERABYTE
1418         ASSERT(qpos >= 0);   /* assertion to validate change for 64bit */
1419         if (qpos < 0) /* not a diskq offset */
1420                 return;
1421 #endif
1422 
1423         q = &group->diskq;
1424         mutex_enter(QLOCK(q));
1425 
1426         hp = p = q->iohdrs;
1427 
1428         /* find outstanding io_hdr */
1429         while (hp) {
1430                 if (hp->dat.qpos == qpos) {
1431                         found++;
1432                         break;
1433                 }
1434                 cnt++;
1435                 p = hp;
1436                 hp = hp->dat.next;
1437         }
1438 
1439         if (!found) {
1440                 if (RDC_BETWEEN(QHEAD(q), QNXTIO(q), qpos)) {
1441 #ifdef DEBUG
1442                         cmn_err(CE_WARN, "!iohdr already cleared? "
1443                         "qpos %" NSC_SZFMT " cnt %d ", qpos, cnt);
1444                         cmn_err(CE_WARN, "!Qinfo: " QDISPLAY(q));
1445 #endif
1446                         mutex_exit(QLOCK(q));
1447                         return;
1448                 }
1449                 mutex_exit(QLOCK(q));
1450                 return;
1451         }
1452 
1453         /* mark it as flushed */
1454         hp->dat.iostatus = RDC_IOHDR_DONE;
1455 
1456         /*
1457          * if it is the head pointer, travel the list updating the queue
1458          * pointers until the next unflushed is reached, freeing on the way.
1459          */
1460         while (hp && (hp->dat.qpos == QHEAD(q)) &&
1461             (hp->dat.iostatus == RDC_IOHDR_DONE)) {
1462 #ifdef DEBUG_FLUSHER_UBERNOISE
1463                 cmn_err(CE_NOTE, "!clr_iohdr info: magic %x type %d pos %d"
1464                     " qpos %d hpos %d len %d flag 0x%x iostatus %x setid %d",
1465                     hp->dat.magic, hp->dat.type, hp->dat.pos, hp->dat.qpos,
1466                     hp->dat.hpos, hp->dat.len, hp->dat.flag,
1467                     hp->dat.iostatus, hp->dat.setid);
1468 #endif
1469                 if (hp->dat.flag & RDC_NULL_BUF) {
1470                         INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)));
1471                 } else {
1472                         INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)) + hp->dat.len);
1473                         DEC_QBLOCKS(q, hp->dat.len);
1474                 }
1475 
1476                 DEC_QNITEMS(q, 1);
1477 
1478                 if (QHEADSHLDWRAP(q)) { /* simple enough */
1479 #ifdef DEBUG_DISKQWRAP
1480                         cmn_err(CE_NOTE, "!wrapping Q head: " QDISPLAY(q));
1481 #endif
1482                         /*LINTED*/
1483                         WRAPQHEAD(q);
1484                 }
1485 
1486                 /* get rid of the iohdr */
1487                 if (hp == q->iohdrs) {
1488                         q->iohdrs = hp->dat.next;
1489                         kmem_free(hp, sizeof (*hp));
1490                         hp = q->iohdrs;
1491                 } else {
1492                         if (hp == q->hdr_last)
1493                                 q->hdr_last = p;
1494                         p->dat.next = hp->dat.next;
1495                         kmem_free(hp, sizeof (*hp));
1496                         hp = p->dat.next;
1497                 }
1498                 q->hdrcnt--;
1499         }
1500 
1501         if (QEMPTY(q) && !IS_QSTATE(q, RDC_QFULL) &&
1502             !(IS_QSTATE(q, RDC_QDISABLEPEND))) {
1503 #ifdef DEBUG_FLUSHER_UBERNOISE
1504                 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1505                 cmn_err(CE_NOTE, "!clr_iohdr: diskq %s empty, "
1506                     "resetting defaults", urdc->disk_queue);
1507 #endif
1508 
1509                 rdc_init_diskq_header(group, &q->disk_hdr);
1510                 SET_QNXTIO(q, QHEAD(q));
1511         }
1512 
1513         /* wakeup any blocked enqueue threads */
1514         cv_broadcast(&q->qfullcv);
1515         mutex_exit(QLOCK(q));
1516 }
1517 
1518 /*
1519  * put in whatever useful checks we can on the io header
1520  */
1521 int
1522 rdc_iohdr_ok(io_hdr *hdr)
1523 {
1524         if (hdr->dat.magic != RDC_IOHDR_MAGIC)
1525                 goto bad;
1526         return (1);
1527 bad:
1528 
1529 #ifdef DEBUG
1530         cmn_err(CE_WARN, "!Bad io header magic %x type %d pos %" NSC_SZFMT
1531             " hpos %" NSC_SZFMT " qpos %" NSC_SZFMT " len %" NSC_SZFMT
1532             " flag %d iostatus %d setid %d", hdr->dat.magic,
1533             hdr->dat.type, hdr->dat.pos, hdr->dat.hpos, hdr->dat.qpos,
1534             hdr->dat.len, hdr->dat.flag, hdr->dat.iostatus, hdr->dat.setid);
1535 #else
1536         cmn_err(CE_WARN, "!Bad io header retrieved");
1537 #endif
1538         return (0);
1539 }
1540 
1541 /*
1542  * rdc_netqueue_insert()
1543  * add an item to a netqueue. No locks necessary as it should only
1544  * be used in a single threaded manor. If that changes, then
1545  * a lock or assertion should be done here
1546  */
1547 void
1548 rdc_netqueue_insert(rdc_aio_t *aio, net_queue *q)
1549 {
1550         rdc_k_info_t *krdc = &rdc_k_info[aio->index];
1551 
1552         /* paranoid check for bit set */
1553         RDC_CHECK_BIT(krdc, aio->pos, aio->len);
1554 
1555         if (q->net_qhead == NULL) {
1556                 q->net_qhead = q->net_qtail = aio;
1557 
1558         } else {
1559                 q->net_qtail->next = aio;
1560                 q->net_qtail = aio;
1561         }
1562         q->blocks += aio->len;
1563         q->nitems++;
1564 
1565         if (q->nitems > q->nitems_hwm) {
1566                 q->nitems_hwm = q->nitems;
1567         }
1568         if (q->blocks > q->blocks_hwm) {
1569                 q->nitems_hwm = q->blocks;
1570         }
1571 }
1572 
1573 /*
1574  * rdc_fill_aio(aio, hdr)
1575  * take the pertinent info from an io_hdr and stick it in
1576  * an aio, including seq number, abuf.
1577  */
1578 void
1579 rdc_fill_aio(rdc_group_t *grp, rdc_aio_t *aio, io_hdr *hdr, nsc_buf_t *abuf)
1580 {
1581         if (hdr->dat.flag & RDC_NULL_BUF) {
1582                 aio->handle = NULL;
1583         } else {
1584                 aio->handle = abuf;
1585         }
1586         aio->qhandle = abuf;
1587         aio->pos = hdr->dat.pos;
1588         aio->qpos = hdr->dat.qpos;
1589         aio->len = hdr->dat.len;
1590         aio->flag = hdr->dat.flag;
1591         if ((aio->index = rdc_setid2idx(hdr->dat.setid)) < 0)
1592                 return;
1593         mutex_enter(&grp->diskq.disk_qlock);
1594         if (grp->ra_queue.qfflags & RDC_QFILLSLEEP) {
1595                 mutex_exit(&grp->diskq.disk_qlock);
1596                 aio->seq = RDC_NOSEQ;
1597                 return;
1598         }
1599         if (abuf && aio->qhandle) {
1600                 abuf->sb_user++;
1601         }
1602         aio->seq = grp->seq++;
1603         if (grp->seq < aio->seq)
1604                 grp->seq = RDC_NEWSEQ + 1;
1605         mutex_exit(&grp->diskq.disk_qlock);
1606         hdr->dat.iostatus = aio->seq;
1607 
1608 }
1609 
1610 #ifdef DEBUG
1611 int maxaios_perbuf = 0;
1612 int midaios_perbuf = 0;
1613 int aveaios_perbuf = 0;
1614 int totaios_perbuf = 0;
1615 int buf2qcalls = 0;
1616 
1617 void
1618 calc_perbuf(int items)
1619 {
1620         if (totaios_perbuf < 0) {
1621                 maxaios_perbuf = 0;
1622                 midaios_perbuf = 0;
1623                 aveaios_perbuf = 0;
1624                 totaios_perbuf = 0;
1625                 buf2qcalls = 0;
1626         }
1627 
1628         if (items > maxaios_perbuf)
1629                 maxaios_perbuf = items;
1630         midaios_perbuf = maxaios_perbuf / 2;
1631         totaios_perbuf += items;
1632         aveaios_perbuf = totaios_perbuf / buf2qcalls;
1633 }
1634 #endif
1635 
1636 /*
1637  * rdc_discard_tmpq()
1638  * free up the passed temporary queue
1639  * NOTE: no cv's or mutexes have been initialized
1640  */
1641 void
1642 rdc_discard_tmpq(net_queue *q)
1643 {
1644         rdc_aio_t *aio;
1645 
1646         if (q == NULL)
1647                 return;
1648 
1649         while (q->net_qhead) {
1650                 aio = q->net_qhead;
1651                 q->net_qhead = q->net_qhead->next;
1652                 if (aio->qhandle) {
1653                         aio->qhandle->sb_user--;
1654                         if (aio->qhandle->sb_user == 0) {
1655                                 rdc_fixlen(aio);
1656                                 (void) nsc_free_buf(aio->qhandle);
1657                         }
1658                 }
1659                 kmem_free(aio, sizeof (*aio));
1660                 q->nitems--;
1661         }
1662         kmem_free(q, sizeof (*q));
1663 
1664 }
1665 
1666 /*
1667  * rdc_diskq_buf2queue()
1668  * take a chunk of the diskq, parse it and assemble
1669  * a chain of rdc_aio_t's.
1670  * updates QNXTIO()
1671  */
1672 net_queue *
1673 rdc_diskq_buf2queue(rdc_group_t *grp, nsc_buf_t **abuf, int index)
1674 {
1675         rdc_aio_t *aio = NULL;
1676         nsc_vec_t *vecp = NULL;
1677         uchar_t *vaddr = NULL;
1678         uchar_t *ioaddr = NULL;
1679         net_queue *netq = NULL;
1680         io_hdr  *hdr = NULL;
1681         nsc_buf_t *buf = *abuf;
1682         rdc_u_info_t *urdc = &rdc_u_info[index];
1683         rdc_k_info_t *krdc = &rdc_k_info[index];
1684         disk_queue *dq = &grp->diskq;
1685         net_queue *nq = &grp->ra_queue;
1686         int nullbuf = 0;
1687         nsc_off_t endobuf;
1688         nsc_off_t bufoff;
1689         int vlen;
1690         nsc_off_t fpos;
1691         long bufcnt = 0;
1692         int nullblocks = 0;
1693         int fail = 1;
1694 
1695         if (buf == NULL)
1696                 return (NULL);
1697 
1698         netq = kmem_zalloc(sizeof (*netq), KM_NOSLEEP);
1699         if (netq == NULL) {
1700                 cmn_err(CE_WARN, "!SNDR: unable to allocate net queue");
1701                 return (NULL);
1702         }
1703 
1704         vecp = buf->sb_vec;
1705         vlen = vecp->sv_len;
1706         vaddr = vecp->sv_addr;
1707         bufoff = buf->sb_pos;
1708         endobuf = bufoff + buf->sb_len;
1709 
1710 #ifdef DEBUG_FLUSHER_UBERNOISE
1711         cmn_err(CE_WARN, "!BUFFOFFENTER %d", bufoff);
1712 #endif
1713         /* CONSTCOND */
1714         while (1) {
1715                 if (IS_STATE(urdc, RDC_LOGGING) ||
1716                     (nq->qfflags & RDC_QFILLSLEEP)) {
1717                         fail = 0;
1718                         goto fail;
1719                 }
1720 #ifdef DEBUG_FLUSHER_UBERNOISE
1721                 cmn_err(CE_WARN, "!BUFFOFF_0 %d", bufoff);
1722 #endif
1723 
1724                 if ((vaddr == NULL) || (vlen == 0))
1725                         break;
1726 
1727                 if (vlen <= 0) {
1728                         vecp++;
1729                         vaddr = vecp->sv_addr;
1730                         vlen = vecp->sv_len;
1731                         if (vaddr == NULL)
1732                                 break;
1733                 }
1734 
1735                 /* get the iohdr information */
1736 
1737                 hdr = kmem_zalloc(sizeof (*hdr), KM_NOSLEEP);
1738                 if (hdr == NULL) {
1739                         cmn_err(CE_WARN,
1740                             "!SNDR: unable to alocate net queue header");
1741                         goto fail;
1742                 }
1743 
1744                 ioaddr = (uchar_t *)hdr;
1745 
1746                 bcopy(vaddr, ioaddr, sizeof (*hdr));
1747 
1748                 if (!rdc_iohdr_ok(hdr)) {
1749                         cmn_err(CE_WARN,
1750                             "!unable to retrieve i/o data from queue %s "
1751                             "at offset %" NSC_SZFMT " bp: %" NSC_SZFMT " bl: %"
1752                             NSC_SZFMT, urdc->disk_queue,
1753                             bufoff, buf->sb_pos, buf->sb_len);
1754 #ifdef DEBUG_DISKQ
1755                         cmn_err(CE_WARN, "!FAILING QUEUE state: %x",
1756                             rdc_get_vflags(urdc));
1757                         cmn_err(CE_WARN, "!qinfo: " QDISPLAY(dq));
1758                         cmn_err(CE_WARN, "!VADDR %p, IOADDR %p", vaddr, ioaddr);
1759                         cmn_err(CE_WARN, "!BUF %p", buf);
1760 #endif
1761                         cmn_err(CE_WARN, "!qinfo: " QDISPLAYND(dq));
1762 
1763                         goto fail;
1764                 }
1765 
1766                 nullbuf = hdr->dat.flag & RDC_NULL_BUF;
1767 
1768                 bufoff += FBA_NUM(sizeof (*hdr));
1769 
1770                 /* out of buffer, set nxtio to re read this last hdr */
1771                 if (!nullbuf && ((bufoff + hdr->dat.len) > endobuf)) {
1772                         break;
1773                 }
1774 
1775                 bufcnt += FBA_NUM(sizeof (*hdr));
1776 
1777                 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1778                 if (aio == NULL) {
1779                         bufcnt -= FBA_NUM(sizeof (*hdr));
1780                         cmn_err(CE_WARN, "!SNDR: net queue aio alloc failed");
1781                         goto fail;
1782                 }
1783 
1784                 if (!nullbuf) {
1785                         /* move to next iohdr in big buf */
1786                         bufoff += hdr->dat.len;
1787                         bufcnt += hdr->dat.len;
1788                 }
1789 
1790                 rdc_fill_aio(grp, aio, hdr, buf);
1791 
1792                 if (aio->index < 0) {
1793                         cmn_err(CE_WARN, "!Set id %d not found or no longer "
1794                             "enabled, failing disk queue", hdr->dat.setid);
1795                         kmem_free(aio, sizeof (*aio));
1796                         goto fail;
1797                 }
1798                 if (aio->seq == RDC_NOSEQ) {
1799                         kmem_free(aio, sizeof (*aio));
1800                         fail = 0;
1801                         goto fail;
1802                 }
1803                 if (aio->handle == NULL)
1804                         nullblocks += aio->len;
1805 
1806                 rdc_add_iohdr(hdr, grp);
1807                 hdr = NULL; /* don't accidentally free on break or fail */
1808                 rdc_netqueue_insert(aio, netq);
1809 
1810                 /* no more buffer, skip the below logic */
1811                 if ((bufoff + FBA_NUM(sizeof (*hdr))) >= endobuf) {
1812                         break;
1813                 }
1814 
1815                 fpos = bufoff - buf->sb_pos;
1816                 vecp = buf->sb_vec;
1817                 for (; fpos >= FBA_NUM(vecp->sv_len); vecp++)
1818                         fpos -= FBA_NUM(vecp->sv_len);
1819                 vlen = vecp->sv_len - FBA_SIZE(fpos);
1820                 vaddr = vecp->sv_addr + FBA_SIZE(fpos);
1821                 /* abuf = NULL; */
1822 
1823         }
1824 
1825         /* free extraneous header */
1826         if (hdr) {
1827                 kmem_free(hdr, sizeof (*hdr));
1828                 hdr = NULL;
1829         }
1830 
1831         /*
1832          * probably won't happen, but if we didn't goto fail, but
1833          * we don't contain anything meaningful.. return NULL
1834          * and let the flusher or the sleep/wakeup routines
1835          * decide
1836          */
1837         if (netq && netq->nitems == 0) {
1838                 kmem_free(netq, sizeof (*netq));
1839                 return (NULL);
1840         }
1841 
1842 #ifdef DEBUG
1843         buf2qcalls++;
1844         calc_perbuf(netq->nitems);
1845 #endif
1846         if (IS_STATE(urdc, RDC_LOGGING) ||
1847             nq->qfflags & RDC_QFILLSLEEP) {
1848                 fail = 0;
1849                 goto fail;
1850         }
1851 
1852         mutex_enter(QLOCK(dq));
1853         INC_QNXTIO(dq, bufcnt);
1854         mutex_exit(QLOCK(dq));
1855 
1856         netq->net_qtail->orig_len = nullblocks; /* overload */
1857 
1858         return (netq);
1859 
1860 fail:
1861 
1862         if (hdr) {
1863                 kmem_free(hdr, sizeof (*hdr));
1864         }
1865 
1866         if (netq) {
1867                 if (netq->nitems > 0) {
1868                         /* the never can happen case ... */
1869                         if ((netq->nitems == 1) &&
1870                             (netq->net_qhead->handle == NULL)) {
1871                                 (void) nsc_free_buf(buf);
1872                                 *abuf = NULL;
1873                         }
1874 
1875                 }
1876                 rdc_discard_tmpq(netq);
1877         }
1878 
1879         mutex_enter(QLOCK(dq));
1880         rdc_dump_iohdrs(dq);
1881         mutex_exit(QLOCK(dq));
1882 
1883         if (fail) { /* real failure, not just state change */
1884 #ifdef DEBUG
1885                 cmn_err(CE_WARN, "!rdc_diskq_buf2queue: failing disk queue %s",
1886                     urdc->disk_queue);
1887 #endif
1888                 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
1889         }
1890 
1891         return (NULL);
1892 
1893 }
1894 
1895 /*
1896  * rdc_diskq_unqueue
1897  * remove one chunk from the diskq belonging to
1898  * rdc_k_info[index]
1899  * updates the head and tail pointers in the disk header
1900  * but does not write. The header should be written on ack
1901  * flusher should free whatever..
1902  */
1903 rdc_aio_t *
1904 rdc_diskq_unqueue(int index)
1905 {
1906         int rc, rc1, rc2;
1907         nsc_off_t qhead;
1908         int nullhandle = 0;
1909         io_hdr *iohdr;
1910         rdc_aio_t *aio = NULL;
1911         nsc_buf_t *buf = NULL;
1912         nsc_buf_t *abuf = NULL;
1913         rdc_group_t *group = NULL;
1914         disk_queue *q = NULL;
1915         rdc_k_info_t *krdc = &rdc_k_info[index];
1916         rdc_u_info_t *urdc = &rdc_u_info[index];
1917 
1918         group = krdc->group;
1919         q = &group->diskq;
1920 
1921         if (group->diskqfd == NULL) /* we've been disabled */
1922                 return (NULL);
1923 
1924         aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1925         if (!aio) {
1926                 return (NULL);
1927         }
1928 
1929         iohdr = kmem_zalloc(sizeof (*iohdr), KM_NOSLEEP);
1930         if (!iohdr) {
1931                 kmem_free(aio, sizeof (*aio));
1932                 return (NULL);
1933         }
1934 
1935         mutex_enter(QLOCK(q));
1936         rdc_set_qbusy(q); /* make sure no one disables the queue */
1937         mutex_exit(QLOCK(q));
1938 
1939         DTRACE_PROBE(rdc_diskq_unq_rsrv);
1940 
1941         if (_rdc_rsrv_diskq(group)) {
1942                 cmn_err(CE_WARN, "!rdc_unqueue: %s reserve failed",
1943                     urdc->disk_queue);
1944                 goto fail;
1945         }
1946 
1947         mutex_enter(QHEADLOCK(q));
1948         mutex_enter(QLOCK(q));
1949 
1950         if (IS_STATE(urdc, RDC_DISKQ_FAILED) || IS_STATE(urdc, RDC_LOGGING)) {
1951                 rdc_clr_qbusy(q);
1952                 mutex_exit(QLOCK(q));
1953                 mutex_exit(QHEADLOCK(q));
1954                 kmem_free(aio, sizeof (*aio));
1955                 kmem_free(iohdr, sizeof (*iohdr));
1956                 return (NULL);
1957         }
1958 
1959         if (QNXTIOSHLDWRAP(q)) {
1960 #ifdef DEBUG_DISKQWRAP
1961                 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(q));
1962 #endif
1963                 /*LINTED*/
1964                 WRAPQNXTIO(q);
1965         }
1966 
1967         /* read the metainfo at q->nxt_io first */
1968         if (QNXTIO(q) == QTAIL(q)) { /* empty */
1969 
1970                 _rdc_rlse_diskq(group);
1971                 if (q->lastio->handle)
1972                         (void) nsc_free_buf(q->lastio->handle);
1973                 bzero(&(*q->lastio), sizeof (*q->lastio));
1974 
1975                 mutex_exit(QHEADLOCK(q));
1976                 rdc_clr_qbusy(q);
1977                 mutex_exit(QLOCK(q));
1978                 kmem_free(aio, sizeof (*aio));
1979                 kmem_free(iohdr, sizeof (*iohdr));
1980                 return (NULL);
1981         }
1982 
1983         qhead = QNXTIO(q);
1984 
1985         /*
1986          * have to drop the lock here, sigh. Cannot block incoming io
1987          * we have to wait until after this read to find out how
1988          * much to increment QNXTIO. Might as well grab the seq then too
1989          */
1990 
1991         while ((qhead == LASTQTAIL(q)) && (IS_QSTATE(q, QTAILBUSY))) {
1992                 mutex_exit(QLOCK(q));
1993 #ifdef DEBUG_DISKQ
1994                 cmn_err(CE_NOTE, "!Qtail busy delay lastqtail: %d", qhead);
1995 #endif
1996                 delay(5);
1997                 mutex_enter(QLOCK(q));
1998         }
1999         mutex_exit(QLOCK(q));
2000 
2001         DTRACE_PROBE(rdc_diskq_iohdr_read_start);
2002 
2003         rc = rdc_ns_io(group->diskqfd, NSC_READ, qhead,
2004             (uchar_t *)iohdr, FBA_SIZE(1));
2005 
2006         DTRACE_PROBE(rdc_diskq_iohdr_read_end);
2007 
2008         if (!RDC_SUCCESS(rc) || !rdc_iohdr_ok(iohdr)) {
2009                 cmn_err(CE_WARN, "!unable to retrieve i/o data from queue %s"
2010                     " at offset %" NSC_SZFMT " rc %d", urdc->disk_queue,
2011                     qhead, rc);
2012 #ifdef DEBUG_DISKQ
2013                 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
2014 #endif
2015                 mutex_exit(QHEADLOCK(q));
2016                 goto fail;
2017         }
2018 
2019 /* XXX process buffer here, creating rdc_aio_t's */
2020 
2021         mutex_enter(QLOCK(q));
2022         /* update the next pointer */
2023         if (iohdr->dat.flag == RDC_NULL_BUF) {
2024                 INC_QNXTIO(q, FBA_LEN(sizeof (io_hdr)));
2025                 nullhandle = 1;
2026         } else {
2027                 INC_QNXTIO(q, (FBA_LEN(sizeof (io_hdr)) + iohdr->dat.len));
2028         }
2029 
2030         aio->seq = group->seq++;
2031         if (group->seq < aio->seq)
2032                 group->seq = RDC_NEWSEQ + 1;
2033 
2034         mutex_exit(QLOCK(q));
2035         mutex_exit(QHEADLOCK(q));
2036 
2037 #ifdef DEBUG_FLUSHER_UBERNOISE
2038         p = &iohdr->dat;
2039         cmn_err(CE_NOTE, "!unqueued iohdr from %d pos: %d len: %d flag: %d "
2040             "iostatus: %d setid: %d time: %d", qhead, p->pos, p->len,
2041             p->flag, p->iostatus, p->setid, p->time);
2042 #endif
2043 
2044         if (nullhandle) /* nothing to get from queue */
2045                 goto nullbuf;
2046 
2047         /* now that we know how much to get (iohdr.dat.len), get it */
2048         DTRACE_PROBE(rdc_diskq_unq_allocbuf1_start);
2049 
2050         rc = nsc_alloc_buf(group->diskqfd, qhead + 1, iohdr->dat.len,
2051             NSC_NOCACHE | NSC_READ, &buf);
2052 
2053         DTRACE_PROBE(rdc_diskq_unq_allocbuf1_end);
2054 
2055         /* and get somewhere to keep it for a bit */
2056         DTRACE_PROBE(rdc_diskq_unq_allocbuf2_start);
2057 
2058         rc1 = nsc_alloc_abuf(qhead + 1, iohdr->dat.len, 0, &abuf);
2059 
2060         DTRACE_PROBE(rdc_diskq_unq_allocbuf2_end);
2061 
2062         if (!RDC_SUCCESS(rc) || !RDC_SUCCESS(rc1)) { /* uh-oh */
2063                 cmn_err(CE_WARN, "!disk queue %s read failure",
2064                     urdc->disk_queue);
2065                 goto fail;
2066         }
2067 
2068         /* move it on over... */
2069         rc2 = nsc_copy(buf, abuf, qhead + 1, qhead + 1, iohdr->dat.len);
2070 
2071         if (!RDC_SUCCESS(rc2)) {
2072 #ifdef DEBUG
2073                 cmn_err(CE_WARN, "!nsc_copy failed for diskq unqueue");
2074 #endif
2075                 goto fail;
2076         }
2077 
2078         /* let go of the real buf, we've got the abuf  */
2079         (void) nsc_free_buf(buf);
2080         buf = NULL;
2081 
2082         aio->handle = abuf;
2083         /* Hack in the original sb_pos */
2084         aio->handle->sb_pos = iohdr->dat.hpos;
2085 
2086         /* skip the RDC_HANDLE_LIMITS check */
2087         abuf->sb_user |= RDC_DISKQUE;
2088 
2089 nullbuf:
2090         if (nullhandle) {
2091                 aio->handle = NULL;
2092         }
2093 
2094         /* set up the rest of the aio values, seq set above ... */
2095         aio->pos = iohdr->dat.pos;
2096         aio->qpos = iohdr->dat.qpos;
2097         aio->len = iohdr->dat.len;
2098         aio->flag = iohdr->dat.flag;
2099         aio->index = rdc_setid2idx(iohdr->dat.setid);
2100         if (aio->index < 0) { /* uh-oh */
2101 #ifdef DEBUG
2102                 cmn_err(CE_WARN, "!rdc_diskq_unqueue: index < 0");
2103 #endif
2104                 goto fail;
2105         }
2106 
2107 
2108 #ifdef DEBUG_FLUSHER_UBERNOISE_STAMP
2109         h = &q->disk_hdr.h;
2110         cmn_err(CE_NOTE, "!stamping diskq header:\n"
2111             "magic: %x\nstate: %d\nhead_offset: %d\n"
2112             "tail_offset: %d\ndisk_size: %d\nnitems: %d\nblocks: %d\n",
2113             h->magic, h->state, h->head_offset, h->tail_offset,
2114             h->disk_size, h->nitems, h->blocks);
2115 #endif
2116 
2117         _rdc_rlse_diskq(group);
2118 
2119         mutex_enter(QLOCK(q));
2120         rdc_clr_qbusy(q);
2121         mutex_exit(QLOCK(q));
2122 
2123         DTRACE_PROBE(rdc_diskq_unq_rlse);
2124 
2125         iohdr->dat.iostatus = aio->seq;
2126         rdc_add_iohdr(iohdr, group);
2127 
2128 #ifdef DEBUG_FLUSHER_UBERNOISE
2129         if (!nullhandle) {
2130                 cmn_err(CE_NOTE, "!UNQUEUING, %p"
2131                     " contents: %c%c%c%c%c pos: %d len: %d",
2132                     (void *)aio->handle,
2133                     aio->handle->sb_vec[0].sv_addr[0],
2134                     aio->handle->sb_vec[0].sv_addr[1],
2135                     aio->handle->sb_vec[0].sv_addr[2],
2136                     aio->handle->sb_vec[0].sv_addr[3],
2137                     aio->handle->sb_vec[0].sv_addr[4],
2138                     aio->handle->sb_pos, aio->handle->sb_len);
2139         } else {
2140                 cmn_err(CE_NOTE, "!UNQUEUING, NULL " QDISPLAY(q));
2141         }
2142         cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
2143 #endif
2144 
2145         return (aio);
2146 
2147 fail:
2148         if (aio)
2149                 kmem_free(aio, sizeof (*aio));
2150         if (iohdr)
2151                 kmem_free(iohdr, sizeof (*iohdr));
2152         if (buf)
2153                 (void) nsc_free_buf(buf);
2154         if (abuf)
2155                 (void) nsc_free_buf(abuf);
2156 
2157         _rdc_rlse_diskq(group);
2158 #ifdef DEBUG
2159         cmn_err(CE_WARN, "!diskq_unqueue: failing diskq");
2160 #endif
2161         mutex_enter(QLOCK(q));
2162         rdc_clr_qbusy(q);
2163         mutex_exit(QLOCK(q));
2164 
2165         rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2166 
2167         return (NULL);
2168 }
2169 
2170 int
2171 rdc_diskq_inuse(rdc_set_t *set, char *diskq)
2172 {
2173         rdc_u_info_t *urdc;
2174         char *group;
2175         int index;
2176 
2177         group = set->group_name;
2178 
2179         ASSERT(MUTEX_HELD(&rdc_conf_lock));
2180 
2181         if ((rdc_lookup_bitmap(diskq) >= 0) ||
2182             (rdc_lookup_configured(diskq) >= 0)) {
2183                 return (1);
2184         }
2185         for (index = 0; index < rdc_max_sets; index++) {
2186                 urdc = &rdc_u_info[index];
2187 
2188                 if (!IS_ENABLED(urdc))
2189                         continue;
2190 
2191                 /* same diskq different group */
2192                 if ((strcmp(urdc->disk_queue, diskq) == 0) &&
2193                     (urdc->group_name[0] == '\0' ||
2194                     strcmp(urdc->group_name, group))) {
2195                         return (1);
2196                 }
2197         }
2198         /* last, but not least, lets see if someone is getting really funky */
2199         if ((strcmp(set->disk_queue, set->primary.file) == 0) ||
2200             (strcmp(set->disk_queue, set->primary.bitmap) == 0)) {
2201                 return (1);
2202         }
2203 
2204         return (0);
2205 
2206 }
2207 
2208 #ifdef DEBUG
2209 int maxlen = 0;
2210 int avelen = 0;
2211 int totalen = 0;
2212 int lencalls = 0;
2213 
2214 void
2215 update_lenstats(int len)
2216 {
2217         if (lencalls == 0) {
2218                 lencalls = 1;
2219                 avelen = 0;
2220                 maxlen = 0;
2221                 totalen = 0;
2222         }
2223 
2224         if (len > maxlen)
2225                 maxlen = len;
2226         totalen += len;
2227         avelen = totalen / lencalls;
2228 }
2229 #endif
2230 
2231 /*
2232  * rdc_calc_len()
2233  * returns the size of the diskq that can be read for dequeuing
2234  * always <= RDC_MAX_DISKQREAD
2235  */
2236 int
2237 rdc_calc_len(rdc_k_info_t *krdc, disk_queue *dq)
2238 {
2239         nsc_size_t len = 0;
2240 
2241         ASSERT(MUTEX_HELD(QLOCK(dq)));
2242 
2243         /* ---H-----N-----T--- */
2244         if (QNXTIO(dq) < QTAIL(dq)) {
2245 
2246                 len = min(RDC_MAX_DISKQREAD, QTAIL(dq) - QNXTIO(dq));
2247 
2248         /* ---T-----H-----N--- */
2249         } else if (QNXTIO(dq) > QTAIL(dq)) {
2250                 if (QWRAP(dq)) {
2251                         len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2252                 } else { /* should never happen */
2253                         len = min(RDC_MAX_DISKQREAD, QSIZE(dq) - QNXTIO(dq));
2254                 }
2255         } else if (QNXTIO(dq) == QTAIL(dq)) {
2256                 if (QWRAP(dq) && !IS_QSTATE(dq, QNXTIOWRAPD))
2257                         len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2258         }
2259 
2260         len = min(len, krdc->maxfbas);
2261 
2262 #ifdef DEBUG
2263         lencalls++;
2264         update_lenstats(len);
2265 #endif
2266 
2267         return ((int)len);
2268 }
2269 
2270 /*
2271  * lie a little if we can, so we don't get tied up in
2272  * _nsc_wait_dbuf() on the next read. sb_len MUST be
2273  * restored before nsc_free_buf() however, or we will
2274  * be looking at memory leak city..
2275  * so update the entire queue with the info as well
2276  * and the one that ends up freeing it, can fix the len
2277  * IMPORTANT: This assumes that we are not cached, in
2278  * 3.2 caching was turned off for data volumes, if that
2279  * changes, then this must too
2280  */
2281 void
2282 rdc_trim_buf(nsc_buf_t *buf, net_queue *q)
2283 {
2284         rdc_aio_t *p;
2285         int len;
2286 
2287         if (buf == NULL || q == NULL)
2288                 return;
2289 
2290         if (q && (buf->sb_len >
2291             (q->blocks + q->nitems - q->net_qtail->orig_len))) {
2292                 len = buf->sb_len;
2293                 buf->sb_len = (q->blocks + q->nitems - q->net_qtail->orig_len);
2294         }
2295 
2296         p = q->net_qhead;
2297         do {
2298                 p->orig_len = len;
2299                 p = p->next;
2300 
2301         } while (p);
2302 
2303 }
2304 
2305 /*
2306  * rdc_read_diskq_buf()
2307  * read a large as possible chunk of the diskq into a nsc_buf_t
2308  * and convert it to a net_queue of rdc_aio_t's to be appended
2309  * to the group's netqueue
2310  */
2311 net_queue *
2312 rdc_read_diskq_buf(int index)
2313 {
2314         nsc_buf_t *buf = NULL;
2315         net_queue *tmpnq = NULL;
2316         disk_queue *dq = NULL;
2317         rdc_k_info_t *krdc = &rdc_k_info[index];
2318         rdc_u_info_t *urdc = &rdc_u_info[index];
2319         rdc_group_t *group = krdc->group;
2320         net_queue *nq = &group->ra_queue;
2321         int len = 0;
2322         int rc;
2323         int fail = 0;
2324         int offset = 0;
2325 
2326         if (group == NULL || group->diskqfd == NULL) {
2327                 DTRACE_PROBE(rdc_read_diskq_buf_bail1);
2328                 return (NULL);
2329         }
2330 
2331         dq = &group->diskq;
2332 
2333         mutex_enter(QLOCK(dq));
2334         rdc_set_qbusy(dq); /* prevent disables on the queue */
2335         mutex_exit(QLOCK(dq));
2336 
2337         if (_rdc_rsrv_diskq(group)) {
2338                 cmn_err(CE_WARN, "!rdc_readdiskqbuf: %s reserve failed",
2339                     urdc->disk_queue);
2340                 mutex_enter(QLOCK(dq));
2341                 rdc_clr_qbusy(dq); /* prevent disables on the queue */
2342                 mutex_exit(QLOCK(dq));
2343                 return (NULL);
2344         }
2345 
2346         mutex_enter(QHEADLOCK(dq));
2347         mutex_enter(QLOCK(dq));
2348 
2349         if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2350             IS_STATE(urdc, RDC_LOGGING) ||
2351             (nq->qfflags & RDC_QFILLSLEEP)) {
2352                 mutex_exit(QLOCK(dq));
2353                 mutex_exit(QHEADLOCK(dq));
2354                 DTRACE_PROBE(rdc_read_diskq_buf_bail2);
2355                 goto done;
2356         }
2357 
2358         /*
2359          * real corner case here, we need to let the flusher wrap first.
2360          * we've gotten too far ahead, so just delay and try again
2361          */
2362         if (IS_QSTATE(dq, QNXTIOWRAPD) && AUXQWRAP(dq)) {
2363                 mutex_exit(QLOCK(dq));
2364                 mutex_exit(QHEADLOCK(dq));
2365                 goto done;
2366         }
2367 
2368         if (QNXTIOSHLDWRAP(dq)) {
2369 #ifdef DEBUG_DISKQWRAP
2370                 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(dq));
2371 #endif
2372                 /*LINTED*/
2373                 WRAPQNXTIO(dq);
2374         }
2375 
2376         /* read the metainfo at q->nxt_io first */
2377         if (!QNITEMS(dq)) { /* empty */
2378 
2379                 if (dq->lastio->handle)
2380                         (void) nsc_free_buf(dq->lastio->handle);
2381                 bzero(&(*dq->lastio), sizeof (*dq->lastio));
2382                 mutex_exit(QLOCK(dq));
2383                 mutex_exit(QHEADLOCK(dq));
2384                 DTRACE_PROBE(rdc_read_diskq_buf_bail3);
2385                 goto done;
2386         }
2387 
2388 
2389         len = rdc_calc_len(krdc, dq);
2390 
2391         if ((len <= 0) || (IS_STATE(urdc, RDC_LOGGING)) ||
2392             (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2393             (nq->qfflags & RDC_QFILLSLEEP)) {
2394                 mutex_exit(QLOCK(dq));
2395                 mutex_exit(QHEADLOCK(dq));
2396                 /*
2397                  * a write could be trying to get on the queue, or if
2398                  * the queue is really really small, a complete image
2399                  * of it could be on the net queue waiting for flush.
2400                  * the latter being a fairly stupid scenario and a gross
2401                  * misconfiguration.. but what the heck, why make the thread
2402                  * thrash around.. just pause a little here.
2403                  */
2404                 if (len <= 0)
2405                         delay(50);
2406 
2407                 DTRACE_PROBE3(rdc_read_diskq_buf_bail4, int, len,
2408                     int, rdc_get_vflags(urdc), int, nq->qfflags);
2409 
2410                 goto done;
2411         }
2412 
2413         DTRACE_PROBE2(rdc_calc_len, int, len, int, (int)QNXTIO(dq));
2414 
2415 #ifdef DEBUG_FLUSHER_UBERNOISE
2416         cmn_err(CE_WARN, "!CALC_LEN(%d) h:%d n%d t%d, w%d",
2417             len, QHEAD(dq), QNXTIO(dq), QTAIL(dq), QWRAP(dq));
2418         cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(dq));
2419 #endif
2420         SET_QCOALBOUNDS(dq, QNXTIO(dq) + len);
2421 
2422         while ((LASTQTAIL(dq) > 0) && !QWRAP(dq) &&
2423             ((QNXTIO(dq) + len) >= LASTQTAIL(dq)) &&
2424             (IS_QSTATE(dq, QTAILBUSY))) {
2425                 mutex_exit(QLOCK(dq));
2426 
2427 #ifdef DEBUG_FLUSHER_UBERNOISE
2428                 cmn_err(CE_NOTE, "!Qtail busy delay nxtio %d len %d "
2429                     "lastqtail: %d", QNXTIO(dq), len, LASTQTAIL(dq));
2430 #endif
2431                 delay(20);
2432                 mutex_enter(QLOCK(dq));
2433         }
2434 
2435         offset = QNXTIO(dq);
2436 
2437         /*
2438          * one last check to see if we have gone logging, or should.
2439          * we may have released the mutex above, so check again
2440          */
2441         if ((IS_STATE(urdc, RDC_LOGGING)) ||
2442             (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2443             (nq->qfflags & RDC_QFILLSLEEP)) {
2444                 mutex_exit(QLOCK(dq));
2445                 mutex_exit(QHEADLOCK(dq));
2446                 goto done;
2447         }
2448 
2449         mutex_exit(QLOCK(dq));
2450         mutex_exit(QHEADLOCK(dq));
2451 
2452         DTRACE_PROBE2(rdc_buf2q_preread, int, offset, int, len);
2453 
2454         rc = nsc_alloc_buf(group->diskqfd, offset, len,
2455             NSC_NOCACHE | NSC_READ, &buf);
2456 
2457         if (!RDC_SUCCESS(rc)) {
2458                 cmn_err(CE_WARN, "!disk queue %s read failure pos %" NSC_SZFMT
2459                     " len %d", urdc->disk_queue, QNXTIO(dq), len);
2460                 fail++;
2461                 buf = NULL;
2462                 DTRACE_PROBE(rdc_read_diskq_buf_bail5);
2463                 goto done;
2464         }
2465 
2466         DTRACE_PROBE2(rdc_buf2q_postread, int, offset, nsc_size_t, buf->sb_len);
2467 
2468         /*
2469          * convert buf to a net_queue. buf2queue will
2470          * update the QNXTIO pointer for us, based on
2471          * the last readable queue item
2472          */
2473         tmpnq = rdc_diskq_buf2queue(group, &buf, index);
2474 
2475 #ifdef DEBUG_FLUSHER_UBERNOISE
2476         cmn_err(CE_NOTE, "!QBUF p: %d l: %d p+l: %d users: %d qblocks: %d ",
2477             "qitems: %d WASTED: %d", buf->sb_pos, buf->sb_len,
2478             buf->sb_pos+buf->sb_len, buf->sb_user, tmpnq?tmpnq->blocks:-1,
2479             tmpnq?tmpnq->nitems:-1,
2480             tmpnq?((buf->sb_len-tmpnq->nitems) - tmpnq->blocks):-1);
2481 #endif
2482 
2483         DTRACE_PROBE3(rdc_buf2que_returned, net_queue *, tmpnq?tmpnq:0,
2484             uint64_t, tmpnq?tmpnq->nitems:0,
2485             uint_t, tmpnq?tmpnq->net_qhead->seq:0);
2486 done:
2487 
2488         /* we don't need to retain the buf */
2489         if (tmpnq == NULL)
2490                 if (buf) {
2491                         (void) nsc_free_buf(buf);
2492                         buf = NULL;
2493                 }
2494 
2495         rdc_trim_buf(buf, tmpnq);
2496 
2497         mutex_enter(QLOCK(dq));
2498         rdc_clr_qbusy(dq);
2499         mutex_exit(QLOCK(dq));
2500 
2501         _rdc_rlse_diskq(group);
2502 
2503         if (fail) {
2504                 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2505                 tmpnq = NULL;
2506         }
2507 
2508         return (tmpnq);
2509 }
2510 
2511 /*
2512  * rdc_dequeue()
2513  * removes the head of the memory queue
2514  */
2515 rdc_aio_t *
2516 rdc_dequeue(rdc_k_info_t *krdc, int *rc)
2517 {
2518         net_queue *q = &krdc->group->ra_queue;
2519         disk_queue *dq = &krdc->group->diskq;
2520         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2521         rdc_aio_t *aio;
2522 
2523         *rc = 0;
2524 
2525         if (q == NULL)
2526                 return (NULL);
2527 
2528         mutex_enter(&q->net_qlock);
2529 
2530         aio = q->net_qhead;
2531 
2532         if (aio == NULL) {
2533 #ifdef DEBUG
2534                 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2535                         cmn_err(CE_PANIC,
2536                             "rdc_dequeue(1): q %p, q blocks %" NSC_SZFMT
2537                             " , nitems %" NSC_SZFMT ", qhead %p qtail %p",
2538                             (void *) q, q->blocks, q->nitems,
2539                             (void *) aio, (void *) q->net_qtail);
2540                 }
2541 #endif
2542 
2543                 mutex_exit(&q->net_qlock);
2544 
2545                 if ((!IS_STATE(urdc, RDC_LOGGING)) &&
2546                     (!(q->qfflags & RDC_QFILLSLEEP)) &&
2547                     (!IS_STATE(urdc, RDC_SYNCING)) && (QNITEMS(dq) > 0)) {
2548                         *rc = EAGAIN;
2549                 }
2550 
2551                 goto done;
2552         }
2553 
2554         /* aio remove from q */
2555 
2556         q->net_qhead = aio->next;
2557         aio->next = NULL;
2558 
2559         if (q->net_qtail == aio)
2560                 q->net_qtail = q->net_qhead;
2561 
2562         q->blocks -= aio->len;
2563         q->nitems--;
2564 
2565 #ifdef DEBUG
2566         if (q->net_qhead == NULL) {
2567                 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2568                         cmn_err(CE_PANIC, "rdc_dequeue(2): q %p, q blocks %"
2569                             NSC_SZFMT " nitems %" NSC_SZFMT
2570                             " , qhead %p qtail %p",
2571                             (void *) q, q->blocks, q->nitems,
2572                             (void *) q->net_qhead, (void *) q->net_qtail);
2573                 }
2574         }
2575 #endif
2576         mutex_exit(&q->net_qlock);
2577 done:
2578 
2579         mutex_enter(&q->net_qlock);
2580 
2581         if (rdc_qfill_shldwakeup(krdc))
2582                 cv_broadcast(&q->qfcv);
2583 
2584         /*
2585          * clear EAGAIN if
2586          * logging or q filler thread is sleeping or stopping altogether
2587          * or if q filler thread is dead already
2588          * or if syncing, this will return a null aio, with no error code set
2589          * telling the flusher to die
2590          */
2591         if (*rc == EAGAIN) {
2592                 if (IS_STATE(urdc, RDC_LOGGING) ||
2593                     (q->qfflags & (RDC_QFILLSLEEP | RDC_QFILLSTOP)) ||
2594                     (IS_QSTATE(dq, (RDC_QDISABLEPEND | RDC_STOPPINGFLUSH))) ||
2595                     (q->qfill_sleeping == RDC_QFILL_DEAD) ||
2596                     (IS_STATE(urdc, RDC_SYNCING)))
2597                         *rc = 0;
2598         }
2599 
2600         mutex_exit(&q->net_qlock);
2601 
2602         return (aio);
2603 
2604 }
2605 
2606 /*
2607  * rdc_qfill_shldsleep()
2608  * returns 1 if the qfilling code should cv_wait() 0 if not.
2609  * reasons for going into cv_wait();
2610  * there is nothing in the diskq to flush to mem.
2611  * the memory queue has gotten too big and needs more flushing attn.
2612  */
2613 int
2614 rdc_qfill_shldsleep(rdc_k_info_t *krdc)
2615 {
2616         net_queue *nq = &krdc->group->ra_queue;
2617         disk_queue *dq = &krdc->group->diskq;
2618         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2619 
2620         ASSERT(MUTEX_HELD(&nq->net_qlock));
2621 
2622         if (!RDC_IS_DISKQ(krdc->group))
2623                 return (1);
2624 
2625         if (nq->qfflags & RDC_QFILLSLEEP) {
2626 #ifdef DEBUG_DISKQ_NOISY
2627         cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QFILLSLEEP idx: %d",
2628             krdc->index);
2629 #endif
2630                 return (1);
2631         }
2632 
2633         if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2634 #ifdef DEBUG_DISKQ_NOISY
2635         cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: Sync|Log (0x%x)"
2636             " idx: %d", rdc_get_vflags(urdc), urdc->index);
2637 #endif
2638                 return (1);
2639         }
2640 
2641         mutex_enter(QLOCK(dq));
2642         if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
2643 #ifdef DEBUG_DISKQ_NOISY
2644                 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QEMPTY");
2645 #endif
2646                 mutex_exit(QLOCK(dq));
2647                 return (1);
2648         }
2649         mutex_exit(QLOCK(dq));
2650 
2651         if (nq->blocks >= RDC_MAX_QBLOCKS) {
2652                 nq->hwmhit = 1;
2653                 /* stuck flushers ? */
2654 #ifdef DEBUG_DISKQ_NOISY
2655                 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: memq full:"
2656                     " seq: %d seqack %d", krdc->group->seq,
2657                     krdc->group->seqack);
2658 #endif
2659                 return (1);
2660         }
2661 
2662         return (0);
2663 }
2664 
2665 /*
2666  * rdc_join_netqueues(a, b)
2667  * appends queue b to queue a updating all the queue info
2668  * as it is assumed queue a is the important one,
2669  * it's mutex must be held. no one can add to queue b
2670  */
2671 void
2672 rdc_join_netqueues(net_queue *q, net_queue *tmpq)
2673 {
2674         ASSERT(MUTEX_HELD(&q->net_qlock));
2675 
2676         if (q->net_qhead == NULL) { /* empty */
2677 #ifdef DEBUG
2678                 if (q->blocks != 0 || q->nitems != 0) {
2679                         cmn_err(CE_PANIC, "rdc filler: q %p, qhead 0, "
2680                             " q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT,
2681                             (void *) q, q->blocks, q->nitems);
2682                 }
2683 #endif
2684                 q->net_qhead = tmpq->net_qhead;
2685                 q->net_qtail = tmpq->net_qtail;
2686                 q->nitems = tmpq->nitems;
2687                 q->blocks = tmpq->blocks;
2688         } else {
2689                 q->net_qtail->next = tmpq->net_qhead;
2690                 q->net_qtail = tmpq->net_qtail;
2691                 q->nitems += tmpq->nitems;
2692                 q->blocks += tmpq->blocks;
2693         }
2694 
2695         if (q->nitems > q->nitems_hwm) {
2696                 q->nitems_hwm = q->nitems;
2697         }
2698 
2699         if (q->blocks > q->blocks_hwm) {
2700                 q->blocks_hwm = q->blocks;
2701         }
2702 }
2703 
2704 /*
2705  * rdc_qfiller_thr() single thread that moves
2706  * data from the diskq to a memory queue for
2707  * the flusher to pick up.
2708  */
2709 void
2710 rdc_qfiller_thr(rdc_k_info_t *krdc)
2711 {
2712         rdc_group_t *grp = krdc->group;
2713         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2714         net_queue *q = &grp->ra_queue;
2715         net_queue *tmpq = NULL;
2716         int index = krdc->index;
2717 
2718         q->qfill_sleeping = RDC_QFILL_AWAKE;
2719         while (!(q->qfflags & RDC_QFILLSTOP)) {
2720                 if (!RDC_IS_DISKQ(grp) ||
2721                     IS_STATE(urdc, RDC_LOGGING) ||
2722                     IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2723                     (q->qfflags & RDC_QFILLSLEEP)) {
2724                         goto nulltmpq;
2725                 }
2726 
2727                 DTRACE_PROBE(qfiller_top);
2728                 tmpq = rdc_read_diskq_buf(index);
2729 
2730                 if (tmpq == NULL)
2731                         goto nulltmpq;
2732 
2733                 if ((q->qfflags & RDC_QFILLSLEEP) ||
2734                     IS_STATE(urdc, RDC_LOGGING)) {
2735                         rdc_discard_tmpq(tmpq);
2736                         goto nulltmpq;
2737                 }
2738 
2739                 mutex_enter(&q->net_qlock);
2740 
2741                 /* race with log, redundant yet paranoid */
2742                 if ((q->qfflags & RDC_QFILLSLEEP) ||
2743                     IS_STATE(urdc, RDC_LOGGING)) {
2744                         rdc_discard_tmpq(tmpq);
2745                         mutex_exit(&q->net_qlock);
2746                         goto nulltmpq;
2747                 }
2748 
2749 
2750                 rdc_join_netqueues(q, tmpq);
2751                 kmem_free(tmpq, sizeof (*tmpq));
2752                 tmpq = NULL;
2753 
2754                 mutex_exit(&q->net_qlock);
2755 nulltmpq:
2756                 /*
2757                  * sleep for a while if we can.
2758                  * the enqueuing or flushing code will
2759                  * wake us if if necessary.
2760                  */
2761                 mutex_enter(&q->net_qlock);
2762                 while (rdc_qfill_shldsleep(krdc)) {
2763                         q->qfill_sleeping = RDC_QFILL_ASLEEP;
2764                         DTRACE_PROBE(qfiller_sleep);
2765                         cv_wait(&q->qfcv, &q->net_qlock);
2766                         DTRACE_PROBE(qfiller_wakeup);
2767                         q->qfill_sleeping = RDC_QFILL_AWAKE;
2768                         if (q->qfflags & RDC_QFILLSTOP) {
2769 #ifdef DEBUG_DISKQ
2770                         cmn_err(CE_NOTE,
2771                             "!rdc_qfiller_thr: recieved kill signal");
2772 #endif
2773                                 mutex_exit(&q->net_qlock);
2774                                 goto done;
2775                         }
2776                 }
2777                 mutex_exit(&q->net_qlock);
2778 
2779         DTRACE_PROBE(qfiller_bottom);
2780         }
2781 done:
2782         DTRACE_PROBE(qfiller_done);
2783         q->qfill_sleeping = RDC_QFILL_DEAD; /* the big sleep */
2784 
2785 #ifdef DEBUG
2786         cmn_err(CE_NOTE, "!rdc_qfiller_thr stopping");
2787 #endif
2788         q->qfflags &= ~RDC_QFILLSTOP;
2789 
2790 }
2791 
2792 int
2793 _rdc_add_diskq(int index, char *diskq)
2794 {
2795         rdc_k_info_t *krdc, *kp;
2796         rdc_u_info_t *urdc, *up;
2797         rdc_group_t *group;
2798         int rc;
2799 
2800         krdc = &rdc_k_info[index];
2801         urdc = &rdc_u_info[index];
2802         group = krdc->group;
2803 
2804         if (!diskq || urdc->disk_queue[0]) { /* how'd that happen? */
2805 #ifdef DEBUG
2806                 cmn_err(CE_WARN, "!NULL diskq in _rdc_add_diskq");
2807 #endif
2808                 rc = -1;
2809                 goto fail;
2810         }
2811 
2812         /* if the enable fails, this is bzero'ed */
2813         (void) strncpy(urdc->disk_queue, diskq, NSC_MAXPATH);
2814         group->flags &= ~RDC_MEMQUE;
2815         group->flags |= RDC_DISKQUE;
2816 
2817 #ifdef DEBUG
2818         cmn_err(CE_NOTE, "!adding diskq to group %s", urdc->group_name);
2819 #endif
2820         mutex_enter(&rdc_conf_lock);
2821         rc = rdc_enable_diskq(krdc);
2822         mutex_exit(&rdc_conf_lock);
2823 
2824         if (rc == RDC_EQNOADD) {
2825                 goto fail;
2826         }
2827 
2828         RDC_ZERO_BITREF(krdc);
2829         for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
2830                 up = &rdc_u_info[kp->index];
2831                 (void) strncpy(up->disk_queue, diskq, NSC_MAXPATH);
2832                 /* size lives in the diskq structure, already set by enable */
2833                 RDC_ZERO_BITREF(kp);
2834         }
2835 
2836 fail:
2837         return (rc);
2838 
2839 }
2840 
2841 /*
2842  * add a diskq to an existing set/group
2843  */
2844 int
2845 rdc_add_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2846 {
2847         char *diskq;
2848         int rc;
2849         int index;
2850         rdc_k_info_t *krdc, *this;
2851         rdc_u_info_t *urdc;
2852         rdc_group_t *group;
2853         nsc_size_t vol_size = 0;
2854         nsc_size_t req_size = 0;
2855 
2856         mutex_enter(&rdc_conf_lock);
2857         index = rdc_lookup_byname(uparms->rdc_set);
2858         mutex_exit(&rdc_conf_lock);
2859         if (index < 0) {
2860                 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
2861                     uparms->rdc_set->secondary.file);
2862                 rc = RDC_EALREADY;
2863                 goto failed;
2864         }
2865         urdc = &rdc_u_info[index];
2866         krdc = &rdc_k_info[index];
2867         this = &rdc_k_info[index];
2868         group = krdc->group;
2869         diskq = uparms->rdc_set->disk_queue;
2870 
2871         if (!IS_ASYNC(urdc)) {
2872                 spcs_s_add(kstatus, RDC_EQWRONGMODE, urdc->primary.intf,
2873                     urdc->primary.file, urdc->secondary.intf,
2874                     urdc->secondary.file);
2875                 rc = RDC_EQNOQUEUE;
2876                 goto failed;
2877         }
2878 
2879         do {
2880                 if (!IS_STATE(urdc, RDC_LOGGING)) {
2881                         spcs_s_add(kstatus, RDC_EQNOTLOGGING,
2882                             uparms->rdc_set->disk_queue);
2883                         rc = RDC_EQNOTLOGGING;
2884                         goto failed;
2885                 }
2886                 /* make sure that we have enough bitmap vol */
2887                 req_size = RDC_BITMAP_FBA + FBA_LEN(krdc->bitmap_size);
2888                 req_size += FBA_LEN(krdc->bitmap_size * BITS_IN_BYTE);
2889 
2890                 rc = _rdc_rsrv_devs(krdc, RDC_BMP, RDC_INTERNAL);
2891 
2892                 if (!RDC_SUCCESS(rc)) {
2893                         cmn_err(CE_WARN,
2894                             "!rdc_open_diskq: Bitmap reserve failed");
2895                         spcs_s_add(kstatus, RDC_EBITMAP,
2896                             urdc->primary.bitmap);
2897                         rc = RDC_EBITMAP;
2898                         goto failed;
2899                 }
2900 
2901                 (void) nsc_partsize(krdc->bitmapfd, &vol_size);
2902 
2903                 _rdc_rlse_devs(krdc, RDC_BMP);
2904 
2905                 if (vol_size < req_size) {
2906                         spcs_s_add(kstatus, RDC_EBITMAP2SMALL,
2907                             urdc->primary.bitmap);
2908                         rc = RDC_EBITMAP2SMALL;
2909                         goto failed;
2910                 }
2911 
2912                 krdc = krdc->group_next;
2913                 urdc = &rdc_u_info[krdc->index];
2914 
2915         } while (krdc != this);
2916 
2917         if (urdc->disk_queue[0] != '\0') {
2918                 spcs_s_add(kstatus, RDC_EQALREADY, urdc->primary.intf,
2919                     urdc->primary.file, urdc->secondary.intf,
2920                     urdc->secondary.file);
2921                 rc = RDC_EQALREADY;
2922                 goto failed;
2923         }
2924 
2925         if (uparms->options & RDC_OPT_SECONDARY) { /* how'd we get here? */
2926                 spcs_s_add(kstatus, RDC_EQWRONGMODE);
2927                 rc = RDC_EQWRONGMODE;
2928                 goto failed;
2929         }
2930 
2931         mutex_enter(&rdc_conf_lock);
2932         if (rdc_diskq_inuse(uparms->rdc_set, uparms->rdc_set->disk_queue)) {
2933                 spcs_s_add(kstatus, RDC_EDISKQINUSE,
2934                     uparms->rdc_set->disk_queue);
2935                 rc = RDC_EDISKQINUSE;
2936                 mutex_exit(&rdc_conf_lock);
2937                 goto failed;
2938         }
2939         mutex_exit(&rdc_conf_lock);
2940 
2941         rdc_group_enter(krdc);
2942         rc = _rdc_add_diskq(urdc->index, diskq);
2943         if (rc < 0 || rc == RDC_EQNOADD) {
2944                 group->flags &= ~RDC_DISKQUE;
2945                 group->flags |= RDC_MEMQUE;
2946                 spcs_s_add(kstatus, RDC_EQNOADD, uparms->rdc_set->disk_queue);
2947                 rc = RDC_EQNOADD;
2948         }
2949         rdc_group_exit(krdc);
2950 failed:
2951         return (rc);
2952 }
2953 
2954 int
2955 _rdc_init_diskq(rdc_k_info_t *krdc)
2956 {
2957         rdc_group_t *group = krdc->group;
2958         disk_queue  *q = &group->diskq;
2959 
2960         rdc_init_diskq_header(group, &group->diskq.disk_hdr);
2961         SET_QNXTIO(q, QHEAD(q));
2962 
2963         if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0)
2964                 goto fail;
2965 
2966         return (0);
2967 fail:
2968         return (-1);
2969 }
2970 
2971 /*
2972  * inititalize the disk queue. This is a destructive
2973  * operation that will not check for emptiness of the queue.
2974  */
2975 int
2976 rdc_init_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2977 {
2978         int rc = 0;
2979         int index;
2980         rdc_k_info_t *krdc, *kp;
2981         rdc_u_info_t *urdc, *up;
2982         rdc_set_t    *uset;
2983         rdc_group_t  *group;
2984         disk_queue   *qp;
2985 
2986         uset = uparms->rdc_set;
2987 
2988         mutex_enter(&rdc_conf_lock);
2989         index = rdc_lookup_byname(uset);
2990         mutex_exit(&rdc_conf_lock);
2991         if (index < 0) {
2992                 spcs_s_add(kstatus, RDC_EALREADY, uset->primary.file,
2993                     uset->secondary.file);
2994                 rc = RDC_EALREADY;
2995                 goto fail;
2996         }
2997 
2998         krdc = &rdc_k_info[index];
2999         urdc = &rdc_u_info[index];
3000         group = krdc->group;
3001         qp = &group->diskq;
3002 
3003         if (!IS_STATE(urdc, RDC_SYNCING) && !IS_STATE(urdc, RDC_LOGGING)) {
3004                 spcs_s_add(kstatus, RDC_EQUEISREP, urdc->disk_queue);
3005                 rc = RDC_EQUEISREP;
3006                 goto fail;
3007         }
3008 
3009         /*
3010          * a couple of big "ifs" here. in the first implementation
3011          * neither of these will be possible. This will come into
3012          * play when we persist the queue across reboots
3013          */
3014         if (!(uparms->options & RDC_OPT_FORCE_QINIT)) {
3015                 if (!QEMPTY(qp)) {
3016                         if (group->rdc_writer) {
3017                                 spcs_s_add(kstatus, RDC_EQFLUSHING,
3018                                     urdc->disk_queue);
3019                                 rc = RDC_EQFLUSHING;
3020                         } else {
3021                                 spcs_s_add(kstatus, RDC_EQNOTEMPTY,
3022                                     urdc->disk_queue);
3023                                 rc = RDC_EQNOTEMPTY;
3024                         }
3025                         goto fail;
3026                 }
3027         }
3028 
3029         mutex_enter(QLOCK(qp));
3030         if (_rdc_init_diskq(krdc) < 0) {
3031                 mutex_exit(QLOCK(qp));
3032                 goto fail;
3033         }
3034         rdc_dump_iohdrs(qp);
3035 
3036         rdc_group_enter(krdc);
3037 
3038         rdc_clr_flags(urdc, RDC_QUEUING);
3039         for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
3040                 up = &rdc_u_info[kp->index];
3041                 rdc_clr_flags(up, RDC_QUEUING);
3042         }
3043         rdc_group_exit(krdc);
3044 
3045         mutex_exit(QLOCK(qp));
3046 
3047         return (0);
3048 fail:
3049         /* generic queue failure */
3050         if (!rc) {
3051                 spcs_s_add(kstatus, RDC_EQINITFAIL, urdc->disk_queue);
3052                 rc = RDC_EQINITFAIL;
3053         }
3054 
3055         return (rc);
3056 }
3057 
3058 int
3059 _rdc_kill_diskq(rdc_u_info_t *urdc)
3060 {
3061         rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
3062         rdc_group_t *group = krdc->group;
3063         disk_queue *q = &group->diskq;
3064         rdc_u_info_t *up;
3065         rdc_k_info_t *p;
3066 
3067         group->flags |= RDC_DISKQ_KILL;
3068 #ifdef DEBUG
3069         cmn_err(CE_NOTE, "!disabling disk queue %s", urdc->disk_queue);
3070 #endif
3071 
3072         mutex_enter(QLOCK(q));
3073         rdc_init_diskq_header(group, &q->disk_hdr);
3074         rdc_dump_iohdrs(q);
3075 
3076         /*
3077          * nsc_close the queue and zero out the queue name
3078          */
3079         rdc_wait_qbusy(q);
3080         rdc_close_diskq(group);
3081         mutex_exit(QLOCK(q));
3082         SET_QSIZE(q, 0);
3083         rdc_clr_flags(urdc, RDC_DISKQ_FAILED);
3084         bzero(urdc->disk_queue, NSC_MAXPATH);
3085         for (p = krdc->group_next; p != krdc; p = p->group_next) {
3086                 up = &rdc_u_info[p->index];
3087                 rdc_clr_flags(up, RDC_DISKQ_FAILED);
3088                 bzero(up->disk_queue, NSC_MAXPATH);
3089         }
3090 
3091 #ifdef DEBUG
3092         cmn_err(CE_NOTE, "!_rdc_kill_diskq: enabling memory queue");
3093 #endif
3094         group->flags &= ~(RDC_DISKQUE|RDC_DISKQ_KILL);
3095         group->flags |= RDC_MEMQUE;
3096         return (0);
3097 }
3098 
3099 /*
3100  * remove this diskq regardless of whether it is draining or not
3101  * stops the flusher by invalidating the qdata (ie, instant empty)
3102  * remove the disk qeueue from the group, leaving the group with a memory
3103  * queue.
3104  */
3105 int
3106 rdc_kill_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3107 {
3108         int rc;
3109         int index;
3110         rdc_u_info_t *urdc;
3111         rdc_k_info_t *krdc;
3112         rdc_set_t *rdc_set = uparms->rdc_set;
3113 
3114         mutex_enter(&rdc_conf_lock);
3115         index = rdc_lookup_byname(uparms->rdc_set);
3116         mutex_exit(&rdc_conf_lock);
3117 
3118         if (index < 0) {
3119                 spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3120                     rdc_set->secondary.file);
3121                 rc = RDC_EALREADY;
3122                 goto failed;
3123         }
3124 
3125         urdc = &rdc_u_info[index];
3126         krdc = &rdc_k_info[index];
3127 
3128         if (!RDC_IS_DISKQ(krdc->group)) {
3129                 spcs_s_add(kstatus, RDC_EQNOQUEUE, rdc_set->primary.intf,
3130                     rdc_set->primary.file, rdc_set->secondary.intf,
3131                     rdc_set->secondary.file);
3132                 rc = RDC_EQNOQUEUE;
3133                 goto failed;
3134         }
3135 
3136 /*
3137  *      if (!IS_STATE(urdc, RDC_LOGGING)) {
3138  *              spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3139  *                  uparms->rdc_set->disk_queue);
3140  *              rc = RDC_EQNOTLOGGING;
3141  *              goto failed;
3142  *      }
3143  */
3144         rdc_unintercept_diskq(krdc->group); /* stop protecting queue */
3145         rdc_group_enter(krdc); /* to prevent further flushing */
3146         rc = _rdc_kill_diskq(urdc);
3147         rdc_group_exit(krdc);
3148 
3149 failed:
3150         return (rc);
3151 }
3152 
3153 /*
3154  * remove a diskq from a group.
3155  * removal of a diskq from a set, or rather
3156  * a set from a queue, is done by reconfigging out
3157  * of the group. This removes the diskq from a whole
3158  * group and replaces it with a memory based queue
3159  */
3160 #define NUM_RETRIES     15      /* Number of retries to wait if no progress */
3161 int
3162 rdc_rem_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3163 {
3164         int index;
3165         rdc_u_info_t *urdc;
3166         rdc_k_info_t *krdc;
3167         rdc_k_info_t *this;
3168         volatile rdc_group_t *group;
3169         volatile disk_queue *diskq;
3170         int threads, counter;
3171         long blocks;
3172 
3173         mutex_enter(&rdc_conf_lock);
3174         index = rdc_lookup_byname(uparms->rdc_set);
3175         mutex_exit(&rdc_conf_lock);
3176         if (index < 0) {
3177                 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
3178                     uparms->rdc_set->secondary.file);
3179                 return (RDC_EALREADY);
3180         }
3181 
3182         urdc = &rdc_u_info[index];
3183         this = &rdc_k_info[index];
3184         krdc = &rdc_k_info[index];
3185 
3186         do {
3187                 if (!IS_STATE(urdc, RDC_LOGGING)) {
3188                         spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3189                             urdc->disk_queue);
3190                         return (RDC_EQNOTLOGGING);
3191                 }
3192                 krdc = krdc->group_next;
3193                 urdc = &rdc_u_info[krdc->index];
3194 
3195         } while (krdc != this);
3196 
3197         /*
3198          * If there is no group or diskq configured, we can leave now
3199          */
3200         if (!(group = krdc->group) || !(diskq = &group->diskq))
3201                 return (0);
3202 
3203 
3204         /*
3205          * Wait if not QEMPTY or threads still active
3206          */
3207         counter = 0;
3208         while (!QEMPTY(diskq) || group->rdc_thrnum) {
3209 
3210                 /*
3211                  * Capture counters to determine if progress is being made
3212                  */
3213                 blocks = QBLOCKS(diskq);
3214                 threads = group->rdc_thrnum;
3215 
3216                 /*
3217                  * Wait
3218                  */
3219                 delay(HZ);
3220 
3221                 /*
3222                  * Has the group or disk queue gone away while delayed?
3223                  */
3224                 if (!(group = krdc->group) || !(diskq = &group->diskq))
3225                         return (0);
3226 
3227                 /*
3228                  * Are we still seeing progress?
3229                  */
3230                 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
3231                         /*
3232                          * No progress see, decrement retry counter
3233                          */
3234                         if (counter++ > NUM_RETRIES) {
3235                                 /*
3236                                  * No progress seen, increment retry counter
3237                                  */
3238                                 int rc = group->rdc_thrnum ?
3239                                     RDC_EQFLUSHING : RDC_EQNOTEMPTY;
3240                                 spcs_s_add(kstatus, rc, urdc->disk_queue);
3241                                 return (rc);
3242                         }
3243                 } else {
3244                         /*
3245                          * Reset counter, as we've made progress
3246                          */
3247                         counter = 0;
3248                 }
3249         }
3250 
3251         return (0);
3252 }