1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #include <sys/types.h> 27 #include <sys/ksynch.h> 28 #include <sys/cmn_err.h> 29 #include <sys/kmem.h> 30 #include <sys/stat.h> 31 #include <sys/errno.h> 32 33 #include "../solaris/nsc_thread.h" 34 #ifdef DS_DDICT 35 #include "../contract.h" 36 #endif 37 #include <sys/nsctl/nsctl.h> 38 39 #include <sys/kmem.h> 40 #include <sys/ddi.h> 41 42 #include <sys/sdt.h> /* dtrace is S10 or later */ 43 44 #include "rdc_io.h" 45 #include "rdc_bitmap.h" 46 #include "rdc_diskq.h" 47 #include "rdc_clnt.h" 48 49 #include <sys/unistat/spcs_s.h> 50 #include <sys/unistat/spcs_s_k.h> 51 #include <sys/unistat/spcs_errors.h> 52 53 extern nsc_io_t *_rdc_io_hc; 54 55 int rdc_diskq_coalesce = 0; 56 57 int 58 _rdc_rsrv_diskq(rdc_group_t *group) 59 { 60 int rc = 0; 61 62 mutex_enter(&group->diskqmutex); 63 if (group->diskqfd == NULL) { 64 mutex_exit(&group->diskqmutex); 65 return (EIO); 66 } else if ((group->diskqrsrv == 0) && 67 (rc = nsc_reserve(group->diskqfd, 0)) != 0) { 68 cmn_err(CE_WARN, 69 "!rdc: nsc_reserve(%s) failed %d\n", 70 nsc_pathname(group->diskqfd), rc); 71 } else { 72 group->diskqrsrv++; 73 } 74 75 mutex_exit(&group->diskqmutex); 76 return (rc); 77 } 78 79 void 80 _rdc_rlse_diskq(rdc_group_t *group) 81 { 82 mutex_enter(&group->diskqmutex); 83 if (group->diskqrsrv > 0 && --group->diskqrsrv == 0) { 84 nsc_release(group->diskqfd); 85 } 86 mutex_exit(&group->diskqmutex); 87 } 88 89 void 90 rdc_wait_qbusy(disk_queue *q) 91 { 92 ASSERT(MUTEX_HELD(QLOCK(q))); 93 while (q->busycnt > 0) 94 cv_wait(&q->busycv, QLOCK(q)); 95 } 96 97 void 98 rdc_set_qbusy(disk_queue *q) 99 { 100 ASSERT(MUTEX_HELD(QLOCK(q))); 101 q->busycnt++; 102 } 103 104 void 105 rdc_clr_qbusy(disk_queue *q) 106 { 107 ASSERT(MUTEX_HELD(QLOCK(q))); 108 q->busycnt--; 109 if (q->busycnt == 0) 110 cv_broadcast(&q->busycv); 111 } 112 113 int 114 rdc_lookup_diskq(char *pathname) 115 { 116 rdc_u_info_t *urdc; 117 #ifdef DEBUG 118 rdc_k_info_t *krdc; 119 #endif 120 int index; 121 122 for (index = 0; index < rdc_max_sets; index++) { 123 urdc = &rdc_u_info[index]; 124 #ifdef DEBUG 125 krdc = &rdc_k_info[index]; 126 #endif 127 ASSERT(krdc->index == index); 128 ASSERT(urdc->index == index); 129 if (!IS_ENABLED(urdc)) 130 continue; 131 132 if (strncmp(pathname, urdc->disk_queue, 133 NSC_MAXPATH) == 0) 134 return (index); 135 } 136 137 return (-1); 138 } 139 140 void 141 rdc_unintercept_diskq(rdc_group_t *grp) 142 { 143 if (!RDC_IS_DISKQ(grp)) 144 return; 145 if (grp->q_tok) 146 (void) nsc_unregister_path(grp->q_tok, 0); 147 grp->q_tok = NULL; 148 } 149 150 void 151 rdc_close_diskq(rdc_group_t *grp) 152 { 153 154 if (grp == NULL) { 155 #ifdef DEBUG 156 cmn_err(CE_WARN, "!rdc_close_diskq: NULL group!"); 157 #endif 158 return; 159 } 160 161 if (grp->diskqfd) { 162 if (nsc_close(grp->diskqfd) != 0) { 163 #ifdef DEBUG 164 cmn_err(CE_WARN, "!nsc_close on diskq failed"); 165 #else 166 ; 167 /*EMPTY*/ 168 #endif 169 } 170 grp->diskqfd = 0; 171 grp->diskqrsrv = 0; 172 } 173 bzero(&grp->diskq.disk_hdr, sizeof (diskq_header)); 174 } 175 176 /* 177 * nsc_open the diskq and attach 178 * the nsc_fd to krdc->diskqfd 179 */ 180 int 181 rdc_open_diskq(rdc_k_info_t *krdc) 182 { 183 rdc_u_info_t *urdc; 184 rdc_group_t *grp; 185 int sts; 186 nsc_size_t size; 187 char *diskqname; 188 int mutexheld = 0; 189 190 grp = krdc->group; 191 urdc = &rdc_u_info[krdc->index]; 192 193 mutex_enter(&grp->diskqmutex); 194 mutexheld++; 195 if (urdc->disk_queue[0] == '\0') { 196 goto fail; 197 } 198 199 diskqname = &urdc->disk_queue[0]; 200 201 if (grp->diskqfd == NULL) { 202 grp->diskqfd = nsc_open(diskqname, 203 NSC_RDCHR_ID|NSC_DEVICE|NSC_WRITE, 0, 0, 0); 204 if (grp->diskqfd == NULL) { 205 cmn_err(CE_WARN, "!rdc_open_diskq: Unable to open %s", 206 diskqname); 207 goto fail; 208 } 209 } 210 if (!grp->q_tok) 211 grp->q_tok = nsc_register_path(urdc->disk_queue, 212 NSC_DEVICE | NSC_CACHE, _rdc_io_hc); 213 214 grp->diskqrsrv = 0; /* init reserve count */ 215 216 mutex_exit(&grp->diskqmutex); 217 mutexheld--; 218 /* just test a reserve release */ 219 sts = _rdc_rsrv_diskq(grp); 220 if (!RDC_SUCCESS(sts)) { 221 cmn_err(CE_WARN, "!rdc_open_diskq: Reserve failed for %s", 222 diskqname); 223 goto fail; 224 } 225 sts = nsc_partsize(grp->diskqfd, &size); 226 _rdc_rlse_diskq(grp); 227 228 if ((sts == 0) && (size < 1)) { 229 rdc_unintercept_diskq(grp); 230 rdc_close_diskq(grp); 231 goto fail; 232 } 233 234 return (0); 235 236 fail: 237 bzero(&urdc->disk_queue, NSC_MAXPATH); 238 if (mutexheld) 239 mutex_exit(&grp->diskqmutex); 240 return (-1); 241 242 } 243 244 /* 245 * rdc_count_vecs 246 * simply vec++'s until sb_addr is null 247 * returns number of vectors encountered 248 */ 249 int 250 rdc_count_vecs(nsc_vec_t *vec) 251 { 252 nsc_vec_t *vecp; 253 int i = 0; 254 vecp = vec; 255 while (vecp->sv_addr) { 256 vecp++; 257 i++; 258 } 259 return (i+1); 260 } 261 /* 262 * rdc_setid2idx 263 * given setid, return index 264 */ 265 int 266 rdc_setid2idx(int setid) 267 { 268 269 int index = 0; 270 271 for (index = 0; index < rdc_max_sets; index++) { 272 if (rdc_u_info[index].setid == setid) 273 break; 274 } 275 if (index >= rdc_max_sets) 276 index = -1; 277 return (index); 278 } 279 280 /* 281 * rdc_idx2setid 282 * given an index, return its setid 283 */ 284 int 285 rdc_idx2setid(int index) 286 { 287 return (rdc_u_info[index].setid); 288 } 289 290 /* 291 * rdc_fill_ioheader 292 * fill in all the stuff you want to save on disk 293 * at the beginnig of each queued write 294 */ 295 void 296 rdc_fill_ioheader(rdc_aio_t *aio, io_hdr *hd, int qpos) 297 { 298 ASSERT(MUTEX_HELD(&rdc_k_info[aio->index].group->diskq.disk_qlock)); 299 300 hd->dat.magic = RDC_IOHDR_MAGIC; 301 hd->dat.type = RDC_QUEUEIO; 302 hd->dat.pos = aio->pos; 303 hd->dat.hpos = aio->pos; 304 hd->dat.qpos = qpos; 305 hd->dat.len = aio->len; 306 hd->dat.flag = aio->flag; 307 hd->dat.iostatus = aio->iostatus; 308 hd->dat.setid = rdc_idx2setid(aio->index); 309 hd->dat.time = nsc_time(); 310 if (!aio->handle) 311 hd->dat.flag |= RDC_NULL_BUF; /* no real data to queue */ 312 } 313 314 /* 315 * rdc_dump_iohdrs 316 * give back the iohdr list 317 * and clear out q->lastio 318 */ 319 void 320 rdc_dump_iohdrs(disk_queue *q) 321 { 322 io_hdr *p, *r; 323 324 ASSERT(MUTEX_HELD(QLOCK(q))); 325 326 p = q->iohdrs; 327 while (p) { 328 r = p->dat.next; 329 kmem_free(p, sizeof (*p)); 330 q->hdrcnt--; 331 p = r; 332 } 333 q->iohdrs = q->hdr_last = NULL; 334 q->hdrcnt = 0; 335 if (q->lastio->handle) 336 (void) nsc_free_buf(q->lastio->handle); 337 bzero(&(*q->lastio), sizeof (*q->lastio)); 338 } 339 340 /* 341 * rdc_fail_diskq 342 * set flags, throw away q info 343 * clean up what you can 344 * wait for flusher threads to stop (taking into account this may be one) 345 * takes group_lock, so conf, many, and bitmap may not be held 346 */ 347 void 348 rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag) 349 { 350 rdc_k_info_t *p; 351 rdc_u_info_t *q = &rdc_u_info[krdc->index]; 352 rdc_group_t *group = krdc->group; 353 disk_queue *dq = &krdc->group->diskq; 354 355 if (IS_STATE(q, RDC_DISKQ_FAILED)) 356 return; 357 358 if (!(flag & RDC_NOFAIL)) 359 cmn_err(CE_WARN, "!disk queue %s failure", q->disk_queue); 360 361 if (flag & RDC_DOLOG) { 362 rdc_group_enter(krdc); 363 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE, 364 "disk queue failed"); 365 rdc_group_exit(krdc); 366 } 367 mutex_enter(QHEADLOCK(dq)); 368 mutex_enter(QLOCK(dq)); 369 /* 370 * quick stop of the flushers 371 * other cleanup is done on the un-failing of the diskq 372 */ 373 SET_QHEAD(dq, RDC_DISKQ_DATA_OFF); 374 SET_QTAIL(dq, RDC_DISKQ_DATA_OFF); 375 SET_QNXTIO(dq, RDC_DISKQ_DATA_OFF); 376 SET_LASTQTAIL(dq, 0); 377 378 rdc_dump_iohdrs(dq); 379 380 mutex_exit(QLOCK(dq)); 381 mutex_exit(QHEADLOCK(dq)); 382 383 bzero(krdc->bitmap_ref, krdc->bitmap_size * BITS_IN_BYTE * 384 BMAP_REF_PREF_SIZE); 385 386 if (flag & RDC_DOLOG) /* otherwise, we already have the conf lock */ 387 rdc_group_enter(krdc); 388 389 else if (!(flag & RDC_GROUP_LOCKED)) 390 ASSERT(MUTEX_HELD(&rdc_conf_lock)); 391 392 if (!(flag & RDC_NOFAIL)) { 393 rdc_set_flags(q, RDC_DISKQ_FAILED); 394 } 395 rdc_clr_flags(q, RDC_QUEUING); 396 397 for (p = krdc->group_next; p != krdc; p = p->group_next) { 398 q = &rdc_u_info[p->index]; 399 if (!IS_ENABLED(q)) 400 continue; 401 if (!(flag & RDC_NOFAIL)) { 402 rdc_set_flags(q, RDC_DISKQ_FAILED); 403 } 404 rdc_clr_flags(q, RDC_QUEUING); 405 bzero(p->bitmap_ref, p->bitmap_size * BITS_IN_BYTE * 406 BMAP_REF_PREF_SIZE); 407 /* RDC_QUEUING is cleared in group_log() */ 408 } 409 410 if (flag & RDC_DOLOG) 411 rdc_group_exit(krdc); 412 413 /* can't wait for myself to go away, I'm a flusher */ 414 if (wait & RDC_WAIT) 415 while (group->rdc_thrnum) 416 delay(2); 417 418 } 419 420 /* 421 * rdc_stamp_diskq 422 * write out diskq header info 423 * must have disk_qlock held 424 * if rsrvd flag is 0, the nsc_reserve is done 425 */ 426 int 427 rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int failflags) 428 { 429 nsc_vec_t vec[2]; 430 nsc_buf_t *head = NULL; 431 rdc_group_t *grp; 432 rdc_u_info_t *urdc; 433 disk_queue *q; 434 int rc, flags; 435 436 grp = krdc->group; 437 q = &krdc->group->diskq; 438 439 ASSERT(MUTEX_HELD(&q->disk_qlock)); 440 441 urdc = &rdc_u_info[krdc->index]; 442 443 if (!rsrvd && _rdc_rsrv_diskq(grp)) { 444 cmn_err(CE_WARN, "!rdc_stamp_diskq: %s reserve failed", 445 urdc->disk_queue); 446 mutex_exit(QLOCK(q)); 447 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags); 448 mutex_enter(QLOCK(q)); 449 return (-1); 450 } 451 flags = NSC_WRITE | NSC_NOCACHE | NSC_NODATA; 452 rc = nsc_alloc_buf(grp->diskqfd, 0, 1, flags, &head); 453 454 if (!RDC_SUCCESS(rc)) { 455 cmn_err(CE_WARN, "!Alloc buf failed for disk queue %s", 456 &urdc->disk_queue[0]); 457 mutex_exit(QLOCK(q)); 458 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags); 459 mutex_enter(QLOCK(q)); 460 return (-1); 461 } 462 vec[0].sv_len = FBA_SIZE(1); 463 vec[0].sv_addr = (uchar_t *)&q->disk_hdr; 464 vec[1].sv_len = 0; 465 vec[1].sv_addr = NULL; 466 467 head->sb_vec = &vec[0]; 468 469 #ifdef DEBUG_DISKQ 470 cmn_err(CE_NOTE, "!rdc_stamp_diskq: hdr: %p magic: %x state: " 471 "%x head: %d tail: %d size: %d nitems: %d blocks: %d", 472 q, QMAGIC(q), QSTATE(q), QHEAD(q), 473 QTAIL(q), QSIZE(q), QNITEMS(q), QBLOCKS(q)); 474 #endif 475 476 rc = nsc_write(head, 0, 1, 0); 477 478 if (!RDC_SUCCESS(rc)) { 479 if (!rsrvd) 480 _rdc_rlse_diskq(grp); 481 cmn_err(CE_CONT, "!disk queue %s failed rc %d", 482 &urdc->disk_queue[0], rc); 483 mutex_exit(QLOCK(q)); 484 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags); 485 mutex_enter(QLOCK(q)); 486 return (-1); 487 } 488 489 (void) nsc_free_buf(head); 490 if (!rsrvd) 491 _rdc_rlse_diskq(grp); 492 493 return (0); 494 } 495 496 /* 497 * rdc_init_diskq_header 498 * load initial values into the header 499 */ 500 void 501 rdc_init_diskq_header(rdc_group_t *grp, dqheader *header) 502 { 503 int rc; 504 int type = 0; 505 disk_queue *q = &grp->diskq; 506 507 ASSERT(MUTEX_HELD(QLOCK(q))); 508 509 /* save q type if this is a failure */ 510 if (QSTATE(q) & RDC_QNOBLOCK) 511 type = RDC_QNOBLOCK; 512 bzero(header, sizeof (*header)); 513 header->h.magic = RDC_DISKQ_MAGIC; 514 header->h.vers = RDC_DISKQ_VERS; 515 header->h.state |= (RDC_SHUTDOWN_BAD|type); /* SHUTDOWN_OK on suspend */ 516 header->h.head_offset = RDC_DISKQ_DATA_OFF; 517 header->h.tail_offset = RDC_DISKQ_DATA_OFF; 518 header->h.nitems = 0; 519 header->h.blocks = 0; 520 header->h.qwrap = 0; 521 SET_QNXTIO(q, QHEAD(q)); 522 SET_QCOALBOUNDS(q, RDC_DISKQ_DATA_OFF); 523 524 /* do this last, as this might be a failure. get the kernel state ok */ 525 rc = _rdc_rsrv_diskq(grp); 526 if (!RDC_SUCCESS(rc)) { 527 cmn_err(CE_WARN, "!init_diskq_hdr: Reserve failed for queue"); 528 return; 529 } 530 (void) nsc_partsize(grp->diskqfd, &header->h.disk_size); 531 _rdc_rlse_diskq(grp); 532 533 } 534 535 /* 536 * rdc_unfail_diskq 537 * the diskq failed for some reason, lets try and re-start it 538 * the old stuff has already been thrown away 539 * should just be called from rdc_sync 540 */ 541 void 542 rdc_unfail_diskq(rdc_k_info_t *krdc) 543 { 544 rdc_k_info_t *p; 545 rdc_u_info_t *q = &rdc_u_info[krdc->index]; 546 rdc_group_t *group = krdc->group; 547 disk_queue *dq = &group->diskq; 548 549 rdc_group_enter(krdc); 550 rdc_clr_flags(q, RDC_ASYNC); 551 /* someone else won the race... */ 552 if (!IS_STATE(q, RDC_DISKQ_FAILED)) { 553 rdc_group_exit(krdc); 554 return; 555 } 556 rdc_clr_flags(q, RDC_DISKQ_FAILED); 557 for (p = krdc->group_next; p != krdc; p = p->group_next) { 558 q = &rdc_u_info[p->index]; 559 if (!IS_ENABLED(q)) 560 continue; 561 rdc_clr_flags(q, RDC_DISKQ_FAILED); 562 rdc_clr_flags(q, RDC_ASYNC); 563 if (IS_STATE(q, RDC_QUEUING)) 564 rdc_clr_flags(q, RDC_QUEUING); 565 } 566 rdc_group_exit(krdc); 567 568 mutex_enter(QLOCK(dq)); 569 570 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 571 /* real i/o to the queue */ 572 /* clear RDC_AUXSYNCIP because we cannot halt a sync that's not here */ 573 krdc->aux_state &= ~RDC_AUXSYNCIP; 574 if (rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED | RDC_DOLOG) < 0) { 575 mutex_exit(QLOCK(dq)); 576 goto fail; 577 } 578 579 SET_QNXTIO(dq, QHEAD(dq)); 580 SET_QHDRCNT(dq, 0); 581 SET_QSTATE(dq, RDC_SHUTDOWN_BAD); /* only suspend can write good */ 582 dq->iohdrs = NULL; 583 dq->hdr_last = NULL; 584 585 /* should be none, but.. */ 586 rdc_dump_iohdrs(dq); 587 588 mutex_exit(QLOCK(dq)); 589 590 591 fail: 592 krdc->aux_state |= RDC_AUXSYNCIP; 593 return; 594 595 } 596 597 int 598 rdc_read_diskq_header(rdc_k_info_t *krdc) 599 { 600 int rc; 601 diskq_header *header; 602 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 603 604 if (krdc->group->diskqfd == NULL) { 605 char buf[NSC_MAXPATH]; 606 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf, 607 &urdc->secondary.intf[0]); 608 cmn_err(CE_WARN, "!Disk Queue Header read failed for %s", 609 urdc->group_name[0] == '\0' ? buf: 610 &urdc->group_name[0]); 611 return (-1); 612 } 613 614 header = &krdc->group->diskq.disk_hdr.h; 615 if (_rdc_rsrv_diskq(krdc->group)) { 616 return (-1); 617 } 618 619 rc = rdc_ns_io(krdc->group->diskqfd, NSC_RDBUF, 0, 620 (uchar_t *)header, sizeof (diskq_header)); 621 622 _rdc_rlse_diskq(krdc->group); 623 624 if (!RDC_SUCCESS(rc)) { 625 char buf[NSC_MAXPATH]; 626 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf, 627 &urdc->secondary.file[0]); 628 cmn_err(CE_WARN, "!Disk Queue Header read failed(%d) for %s", 629 rc, urdc->group_name[0] == '\0' ? buf : 630 &urdc->group_name[0]); 631 return (-1); 632 } 633 return (0); 634 } 635 636 /* 637 * rdc_stop_diskq_flusher 638 */ 639 void 640 rdc_stop_diskq_flusher(rdc_k_info_t *krdc) 641 { 642 disk_queue q, *qp; 643 rdc_group_t *group; 644 #ifdef DEBUG 645 cmn_err(CE_NOTE, "!stopping flusher threads"); 646 #endif 647 group = krdc->group; 648 qp = &krdc->group->diskq; 649 650 /* save the queue info */ 651 q = *qp; 652 653 /* lie a little */ 654 SET_QTAIL(qp, RDC_DISKQ_DATA_OFF); 655 SET_QHEAD(qp, RDC_DISKQ_DATA_OFF); 656 SET_QSTATE(qp, RDC_QDISABLEPEND); 657 SET_QSTATE(qp, RDC_STOPPINGFLUSH); 658 659 /* drop locks to allow flushers to die */ 660 mutex_exit(QLOCK(qp)); 661 mutex_exit(QHEADLOCK(qp)); 662 rdc_group_exit(krdc); 663 664 while (group->rdc_thrnum) 665 delay(2); 666 667 rdc_group_enter(krdc); 668 mutex_enter(QHEADLOCK(qp)); 669 mutex_enter(QLOCK(qp)); 670 671 CLR_QSTATE(qp, RDC_STOPPINGFLUSH); 672 *qp = q; 673 } 674 675 /* 676 * rdc_enable_diskq 677 * open the diskq 678 * and stamp the header onto it. 679 */ 680 int 681 rdc_enable_diskq(rdc_k_info_t *krdc) 682 { 683 rdc_group_t *group; 684 disk_queue *q; 685 686 group = krdc->group; 687 q = &group->diskq; 688 689 if (rdc_open_diskq(krdc) < 0) 690 goto fail; 691 692 mutex_enter(QLOCK(q)); 693 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 694 695 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) { 696 mutex_exit(QLOCK(q)); 697 goto fail; 698 } 699 700 SET_QNXTIO(q, QHEAD(q)); 701 702 mutex_exit(QLOCK(q)); 703 return (0); 704 705 fail: 706 mutex_enter(&group->diskqmutex); 707 rdc_close_diskq(group); 708 mutex_exit(&group->diskqmutex); 709 710 /* caller has to fail diskq after dropping conf & many locks */ 711 return (RDC_EQNOADD); 712 } 713 714 /* 715 * rdc_resume_diskq 716 * open the diskq and read the header 717 */ 718 int 719 rdc_resume_diskq(rdc_k_info_t *krdc) 720 { 721 rdc_u_info_t *urdc; 722 rdc_group_t *group; 723 disk_queue *q; 724 int rc = 0; 725 726 urdc = &rdc_u_info[krdc->index]; 727 group = krdc->group; 728 q = &group->diskq; 729 730 if (rdc_open_diskq(krdc) < 0) { 731 rc = RDC_EQNOADD; 732 goto fail; 733 } 734 735 mutex_enter(QLOCK(q)); 736 737 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 738 739 if (rdc_read_diskq_header(krdc) < 0) { 740 SET_QSTATE(q, RDC_QBADRESUME); 741 rc = RDC_EQNOADD; 742 } 743 744 /* check diskq magic number */ 745 if (QMAGIC(q) != RDC_DISKQ_MAGIC) { 746 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 747 " incorrect magic number in header", urdc->disk_queue); 748 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 749 SET_QSTATE(q, RDC_QBADRESUME); 750 rc = RDC_EQNOADD; 751 } else switch (QVERS(q)) { 752 diskq_header1 h1; /* version 1 header */ 753 diskq_header *hc; /* current header */ 754 755 #ifdef NSC_MULTI_TERABYTE 756 case RDC_DISKQ_VER_ORIG: 757 /* version 1 diskq header, upgrade to 64bit version */ 758 h1 = *(diskq_header1 *)(&group->diskq.disk_hdr.h); 759 hc = &group->diskq.disk_hdr.h; 760 761 cmn_err(CE_WARN, "!SNDR: old version header for diskq %s," 762 " upgrading to current version", urdc->disk_queue); 763 hc->vers = RDC_DISKQ_VERS; 764 hc->state = h1.state; 765 hc->head_offset = h1.head_offset; 766 hc->tail_offset = h1.tail_offset; 767 hc->disk_size = h1.disk_size; 768 hc->nitems = h1.nitems; 769 hc->blocks = h1.blocks; 770 hc->qwrap = h1.qwrap; 771 hc->auxqwrap = h1.auxqwrap; 772 hc->seq_last = h1.seq_last; 773 hc->ack_last = h1.ack_last; 774 775 if (hc->nitems > 0) { 776 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 777 " old version Q contains data", urdc->disk_queue); 778 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 779 SET_QSTATE(q, RDC_QBADRESUME); 780 rc = RDC_EQNOADD; 781 } 782 break; 783 #else 784 case RDC_DISKQ_VER_64BIT: 785 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 786 " diskq header newer than current version", 787 urdc->disk_queue); 788 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 789 SET_QSTATE(q, RDC_QBADRESUME); 790 rc = RDC_EQNOADD; 791 break; 792 #endif 793 case RDC_DISKQ_VERS: 794 /* okay, current version diskq */ 795 break; 796 default: 797 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 798 " unknown diskq header version", urdc->disk_queue); 799 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 800 SET_QSTATE(q, RDC_QBADRESUME); 801 rc = RDC_EQNOADD; 802 break; 803 } 804 if (IS_QSTATE(q, RDC_SHUTDOWN_BAD)) { 805 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s," 806 " unsafe shutdown", urdc->disk_queue); 807 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 808 SET_QSTATE(q, RDC_QBADRESUME); 809 rc = RDC_EQNOADD; 810 } 811 812 CLR_QSTATE(q, RDC_SHUTDOWN_OK); 813 SET_QSTATE(q, RDC_SHUTDOWN_BAD); 814 815 /* bad, until proven not bad */ 816 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) { 817 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_NOLOG); 818 rc = RDC_EQNOADD; 819 } 820 821 SET_QNXTIO(q, QHEAD(q)); 822 group->diskq.nitems_hwm = QNITEMS(q); 823 group->diskq.blocks_hwm = QBLOCKS(q); 824 825 mutex_exit(QLOCK(q)); 826 827 #ifdef DEBUG 828 cmn_err(CE_NOTE, "!rdc_resume_diskq: resuming diskq %s \n", 829 urdc->disk_queue); 830 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q)); 831 #endif 832 if (rc == 0) 833 return (0); 834 835 fail: 836 837 /* caller has to set the diskq failed after dropping it's locks */ 838 return (rc); 839 840 } 841 842 int 843 rdc_suspend_diskq(rdc_k_info_t *krdc) 844 { 845 int rc; 846 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 847 disk_queue *q; 848 849 q = &krdc->group->diskq; 850 851 /* grab both diskq locks as we are going to kill the flusher */ 852 mutex_enter(QHEADLOCK(q)); 853 mutex_enter(QLOCK(q)); 854 855 if ((krdc->group->rdc_thrnum) && (!IS_QSTATE(q, RDC_STOPPINGFLUSH))) { 856 SET_QSTATE(q, RDC_STOPPINGFLUSH); 857 rdc_stop_diskq_flusher(krdc); 858 CLR_QSTATE(q, RDC_STOPPINGFLUSH); 859 } 860 861 krdc->group->diskq.disk_hdr.h.state &= ~RDC_SHUTDOWN_BAD; 862 krdc->group->diskq.disk_hdr.h.state |= RDC_SHUTDOWN_OK; 863 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QBADRESUME; 864 865 /* let's make sure that the flusher has stopped.. */ 866 if (krdc->group->rdc_thrnum) { 867 mutex_exit(QLOCK(q)); 868 mutex_exit(QHEADLOCK(q)); 869 rdc_group_exit(krdc); 870 871 while (krdc->group->rdc_thrnum) 872 delay(5); 873 874 rdc_group_enter(krdc); 875 mutex_enter(QLOCK(q)); 876 mutex_enter(QHEADLOCK(q)); 877 } 878 /* write refcount to the bitmap */ 879 if ((rc = rdc_write_refcount(krdc)) < 0) { 880 rdc_group_exit(krdc); 881 goto fail; 882 } 883 884 if (!QEMPTY(q)) { 885 rdc_set_flags(urdc, RDC_QUEUING); 886 } else { 887 rdc_clr_flags(urdc, RDC_QUEUING); 888 } 889 890 /* fill in diskq header info */ 891 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QDISABLEPEND; 892 893 #ifdef DEBUG 894 cmn_err(CE_NOTE, "!suspending disk queue\n" QDISPLAY(q)); 895 #endif 896 897 /* to avoid a possible deadlock, release in order, and reacquire */ 898 mutex_exit(QLOCK(q)); 899 mutex_exit(QHEADLOCK(q)); 900 901 if (krdc->group->count > 1) { 902 rdc_group_exit(krdc); 903 goto fail; /* just stamp on the last suspend */ 904 } 905 rdc_group_exit(krdc); /* in case this stamp fails */ 906 mutex_enter(QLOCK(q)); 907 908 rc = rdc_stamp_diskq(krdc, 0, RDC_NOLOG); 909 910 mutex_exit(QLOCK(q)); 911 912 fail: 913 rdc_group_enter(krdc); 914 915 /* diskq already failed if stamp failed */ 916 917 return (rc); 918 } 919 920 /* 921 * copy orig aio to copy, including the nsc_buf_t 922 */ 923 int 924 rdc_dup_aio(rdc_aio_t *orig, rdc_aio_t *copy) 925 { 926 int rc; 927 bcopy(orig, copy, sizeof (*orig)); 928 copy->handle = NULL; 929 930 if (orig->handle == NULL) /* no buf to alloc/copy */ 931 return (0); 932 933 rc = nsc_alloc_abuf(orig->pos, orig->len, 0, ©->handle); 934 if (!RDC_SUCCESS(rc)) { 935 #ifdef DEBUG 936 cmn_err(CE_WARN, "!rdc_dup_aio: alloc_buf failed (%d)", rc); 937 #endif 938 return (rc); 939 } 940 rc = nsc_copy(orig->handle, copy->handle, orig->pos, 941 orig->pos, orig->len); 942 if (!RDC_SUCCESS(rc)) { 943 (void) nsc_free_buf(copy->handle); 944 #ifdef DEBUG 945 cmn_err(CE_WARN, "!rdc_dup_aio: copy buf failed (%d)", rc); 946 #endif 947 return (rc); 948 } 949 return (0); 950 } 951 952 /* 953 * rdc_qfill_shldwakeup() 954 * 0 if the memory queue has filled, and the low water 955 * mark has not been reached. 0 if diskq is empty. 956 * 1 if less than low water mark 957 * net_queue mutex is already held 958 */ 959 int 960 rdc_qfill_shldwakeup(rdc_k_info_t *krdc) 961 { 962 rdc_group_t *group = krdc->group; 963 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 964 net_queue *nq = &group->ra_queue; 965 disk_queue *dq = &group->diskq; 966 967 ASSERT(MUTEX_HELD(&nq->net_qlock)); 968 969 if (!RDC_IS_DISKQ(krdc->group)) 970 return (0); 971 972 if (nq->qfill_sleeping != RDC_QFILL_ASLEEP) 973 return (0); 974 975 if (nq->qfflags & RDC_QFILLSTOP) 976 return (1); 977 978 if (nq->qfflags & RDC_QFILLSLEEP) 979 return (0); 980 981 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) 982 return (0); 983 984 mutex_enter(QLOCK(dq)); 985 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) { 986 mutex_exit(QLOCK(dq)); 987 return (0); 988 } 989 mutex_exit(QLOCK(dq)); 990 991 if (nq->qfill_sleeping == RDC_QFILL_ASLEEP) { 992 if (nq->hwmhit) { 993 if (nq->blocks <= RDC_LOW_QBLOCKS) { 994 nq->hwmhit = 0; 995 } else { 996 return (0); 997 } 998 } 999 #ifdef DEBUG_DISKQ_NOISY 1000 cmn_err(CE_NOTE, "!Waking up diskq->memq flusher, flags 0x%x" 1001 " idx: %d", rdc_get_vflags(urdc), urdc->index); 1002 #endif 1003 return (1); 1004 } 1005 return (0); 1006 1007 } 1008 1009 /* 1010 * rdc_diskq_enqueue 1011 * enqueue one i/o to the diskq 1012 * after appending some metadata to the front 1013 */ 1014 int 1015 rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio) 1016 { 1017 nsc_vec_t *vec = NULL; 1018 nsc_buf_t *bp = NULL; 1019 nsc_buf_t *qbuf = NULL; 1020 io_hdr *iohdr = NULL; 1021 disk_queue *q; 1022 rdc_group_t *group; 1023 int numvecs; 1024 int i, j, rc = 0; 1025 int retries = 0; 1026 rdc_u_info_t *urdc; 1027 nsc_size_t iofbas; /* len of io + io header len */ 1028 int qtail; 1029 int delay_time = 2; 1030 int print_msg = 1; 1031 1032 #ifdef DEBUG_WRITER_UBERNOISE 1033 int qhead; 1034 #endif 1035 urdc = &rdc_u_info[krdc->index]; 1036 group = krdc->group; 1037 q = &group->diskq; 1038 1039 mutex_enter(QLOCK(q)); 1040 1041 /* 1042 * there is a thread that is blocking because the queue is full, 1043 * don't try to set up this write until all is clear 1044 * check before and after for logging or failed queue just 1045 * in case a thread was in flight while the queue was full, 1046 * and in the proccess of failing 1047 */ 1048 while (IS_QSTATE(q, RDC_QFULL)) { 1049 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 1050 (IS_STATE(urdc, RDC_LOGGING) && 1051 !IS_STATE(urdc, RDC_QUEUING))) { 1052 mutex_exit(QLOCK(q)); 1053 if (aio->handle) 1054 (void) nsc_free_buf(aio->handle); 1055 return (-1); 1056 } 1057 cv_wait(&q->qfullcv, QLOCK(q)); 1058 1059 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 1060 (IS_STATE(urdc, RDC_LOGGING) && 1061 !IS_STATE(urdc, RDC_QUEUING))) { 1062 mutex_exit(QLOCK(q)); 1063 if (aio->handle) 1064 (void) nsc_free_buf(aio->handle); 1065 return (-1); 1066 } 1067 1068 } 1069 1070 SET_QSTATE(q, QTAILBUSY); 1071 1072 if (aio->handle == NULL) { 1073 /* we're only going to write the header to the queue */ 1074 numvecs = 2; /* kmem_alloc io header + null terminate */ 1075 iofbas = FBA_LEN(sizeof (io_hdr)); 1076 1077 } else { 1078 /* find out how many vecs */ 1079 numvecs = rdc_count_vecs(aio->handle->sb_vec) + 1; 1080 iofbas = aio->len + FBA_LEN(sizeof (io_hdr)); 1081 } 1082 1083 /* 1084 * this, in conjunction with QTAILBUSY, will prevent 1085 * premature dequeuing 1086 */ 1087 1088 SET_LASTQTAIL(q, QTAIL(q)); 1089 1090 iohdr = (io_hdr *) kmem_zalloc(sizeof (io_hdr), KM_NOSLEEP); 1091 vec = (nsc_vec_t *) kmem_zalloc(sizeof (nsc_vec_t) * numvecs, 1092 KM_NOSLEEP); 1093 1094 if (!vec || !iohdr) { 1095 if (!vec) { 1096 cmn_err(CE_WARN, "!vec kmem alloc failed"); 1097 } else { 1098 cmn_err(CE_WARN, "!iohdr kmem alloc failed"); 1099 } 1100 if (vec) 1101 kmem_free(vec, sizeof (*vec)); 1102 if (iohdr) 1103 kmem_free(iohdr, sizeof (*iohdr)); 1104 CLR_QSTATE(q, QTAILBUSY); 1105 SET_LASTQTAIL(q, 0); 1106 mutex_exit(QLOCK(q)); 1107 if (aio->handle) 1108 (void) nsc_free_buf(aio->handle); 1109 return (ENOMEM); 1110 } 1111 1112 vec[numvecs - 1].sv_len = 0; 1113 vec[numvecs - 1].sv_addr = 0; 1114 1115 /* now add the write itself */ 1116 bp = aio->handle; 1117 1118 for (i = 1, j = 0; bp && bp->sb_vec[j].sv_addr && 1119 i < numvecs; i++, j++) { 1120 vec[i].sv_len = bp->sb_vec[j].sv_len; 1121 vec[i].sv_addr = bp->sb_vec[j].sv_addr; 1122 } 1123 1124 retry: 1125 1126 /* check for queue wrap, then check for overflow */ 1127 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 1128 (IS_STATE(urdc, RDC_LOGGING) && !IS_STATE(urdc, RDC_QUEUING))) { 1129 kmem_free(iohdr, sizeof (*iohdr)); 1130 kmem_free(vec, sizeof (*vec) * numvecs); 1131 CLR_QSTATE(q, QTAILBUSY); 1132 SET_LASTQTAIL(q, 0); 1133 if (IS_QSTATE(q, RDC_QFULL)) { /* wakeup blocked threads */ 1134 CLR_QSTATE(q, RDC_QFULL); 1135 cv_broadcast(&q->qfullcv); 1136 } 1137 mutex_exit(QLOCK(q)); 1138 if (aio->handle) 1139 (void) nsc_free_buf(aio->handle); 1140 1141 return (-1); 1142 } 1143 1144 if (QTAILSHLDWRAP(q, iofbas)) { 1145 /* 1146 * just go back to the beginning of the disk 1147 * it's not worth the trouble breaking up the write 1148 */ 1149 #ifdef DEBUG_DISKQWRAP 1150 cmn_err(CE_NOTE, "!wrapping Q tail: " QDISPLAY(q)); 1151 #endif 1152 /*LINTED*/ 1153 WRAPQTAIL(q); 1154 } 1155 1156 /* 1157 * prepend the write's metadata 1158 */ 1159 rdc_fill_ioheader(aio, iohdr, QTAIL(q)); 1160 1161 vec[0].sv_len = FBA_SIZE(1); 1162 vec[0].sv_addr = (uchar_t *)iohdr; 1163 1164 /* check for tail < head */ 1165 1166 if (!(FITSONQ(q, iofbas))) { 1167 /* 1168 * don't allow any more writes to start 1169 */ 1170 SET_QSTATE(q, RDC_QFULL); 1171 mutex_exit(QLOCK(q)); 1172 1173 if ((!group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING)) 1174 (void) rdc_writer(krdc->index); 1175 1176 delay(delay_time); 1177 q->throttle_delay += delay_time; 1178 retries++; 1179 delay_time *= 2; /* fairly aggressive */ 1180 if ((retries >= 8) || (delay_time >= 256)) { 1181 delay_time = 2; 1182 if (print_msg) { 1183 cmn_err(CE_WARN, "!enqueue: disk queue %s full", 1184 &urdc->disk_queue[0]); 1185 print_msg = 0; 1186 #ifdef DEBUG 1187 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q)); 1188 #else 1189 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q)); 1190 #endif 1191 } 1192 /* 1193 * if this is a no-block queue, or this is a blocking 1194 * queue that is not flushing. reset and log 1195 */ 1196 if ((QSTATE(q) & RDC_QNOBLOCK) || 1197 (IS_STATE(urdc, RDC_QUEUING))) { 1198 1199 if (IS_STATE(urdc, RDC_QUEUING)) { 1200 cmn_err(CE_WARN, "!SNDR: disk queue %s full and not flushing. " 1201 "giving up", &urdc->disk_queue[0]); 1202 cmn_err(CE_WARN, "!SNDR: %s:%s entering logging mode", 1203 urdc->secondary.intf, urdc->secondary.file); 1204 } 1205 1206 rdc_fail_diskq(krdc, RDC_WAIT, 1207 RDC_DOLOG | RDC_NOFAIL); 1208 kmem_free(iohdr, sizeof (*iohdr)); 1209 kmem_free(vec, sizeof (*vec) * numvecs); 1210 mutex_enter(QLOCK(q)); 1211 CLR_QSTATE(q, QTAILBUSY | RDC_QFULL); 1212 cv_broadcast(&q->qfullcv); 1213 mutex_exit(QLOCK(q)); 1214 SET_LASTQTAIL(q, 0); 1215 if (aio->handle) 1216 (void) nsc_free_buf(aio->handle); 1217 return (ENOMEM); 1218 } 1219 } 1220 1221 mutex_enter(QLOCK(q)); 1222 goto retry; 1223 1224 } 1225 1226 qtail = QTAIL(q); 1227 #ifdef DEBUG_WRITER_UBERNOISE 1228 qhead = QHEAD(q); 1229 #endif 1230 1231 /* update tail pointer, nitems on queue and blocks on queue */ 1232 INC_QTAIL(q, iofbas); /* increment tail over i/o size + ioheader size */ 1233 INC_QNITEMS(q, 1); 1234 /* increment counter for i/o blocks only */ 1235 INC_QBLOCKS(q, (iofbas - FBA_LEN(sizeof (io_hdr)))); 1236 1237 if (QNITEMS(q) > q->nitems_hwm) 1238 q->nitems_hwm = QNITEMS(q); 1239 if (QBLOCKS(q) > q->blocks_hwm) 1240 q->blocks_hwm = QBLOCKS(q); 1241 1242 if (IS_QSTATE(q, RDC_QFULL)) { 1243 CLR_QSTATE(q, RDC_QFULL); 1244 cv_broadcast(&q->qfullcv); 1245 } 1246 1247 mutex_exit(QLOCK(q)); 1248 1249 /* 1250 * if (krdc->io_kstats) { 1251 * mutex_enter(krdc->io_kstats->ks_lock); 1252 * kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats)); 1253 * mutex_exit(krdc->io_kstats->ks_lock); 1254 * } 1255 */ 1256 1257 DTRACE_PROBE(rdc_diskq_rsrv); 1258 1259 if (_rdc_rsrv_diskq(group)) { 1260 cmn_err(CE_WARN, "!rdc_enqueue: %s reserve failed", 1261 &urdc->disk_queue[0]); 1262 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG); 1263 kmem_free(iohdr, sizeof (*iohdr)); 1264 kmem_free(vec, sizeof (*vec) * numvecs); 1265 mutex_enter(QLOCK(q)); 1266 CLR_QSTATE(q, QTAILBUSY); 1267 SET_LASTQTAIL(q, 0); 1268 mutex_exit(QLOCK(q)); 1269 if (aio->handle) 1270 (void) nsc_free_buf(aio->handle); 1271 return (-1); 1272 } 1273 1274 /* XXX for now do this, but later pre-alloc handle in enable/resume */ 1275 1276 DTRACE_PROBE(rdc_diskq_alloc_start); 1277 rc = nsc_alloc_buf(group->diskqfd, qtail, iofbas, 1278 NSC_NOCACHE | NSC_WRITE | NSC_NODATA, &qbuf); 1279 1280 DTRACE_PROBE(rdc_diskq_alloc_end); 1281 1282 if (!RDC_SUCCESS(rc)) { 1283 cmn_err(CE_WARN, "!disk queue %s alloc failed(%d) %" NSC_SZFMT, 1284 &urdc->disk_queue[0], rc, iofbas); 1285 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG); 1286 rc = ENOMEM; 1287 goto fail; 1288 } 1289 /* move vec and write to queue */ 1290 qbuf->sb_vec = &vec[0]; 1291 1292 #ifdef DEBUG_WRITER_UBERNOISE 1293 1294 cmn_err(CE_NOTE, "!about to write to queue, qbuf: %p, qhead: %d, " 1295 "qtail: %d, len: %d contents: %c%c%c%c%c", 1296 (void *) qbuf, qhead, qtail, iofbas, 1297 qbuf->sb_vec[1].sv_addr[0], 1298 qbuf->sb_vec[1].sv_addr[1], 1299 qbuf->sb_vec[1].sv_addr[2], 1300 qbuf->sb_vec[1].sv_addr[3], 1301 qbuf->sb_vec[1].sv_addr[4]); 1302 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q)); 1303 1304 #endif 1305 1306 DTRACE_PROBE2(rdc_diskq_nswrite_start, int, qtail, nsc_size_t, iofbas); 1307 rc = nsc_write(qbuf, qtail, iofbas, 0); 1308 DTRACE_PROBE2(rdc_diskq_nswrite_end, int, qtail, nsc_size_t, iofbas); 1309 1310 if (!RDC_SUCCESS(rc)) { 1311 cmn_err(CE_WARN, "!disk queue %s write failed %d", 1312 &urdc->disk_queue[0], rc); 1313 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG); 1314 goto fail; 1315 1316 } 1317 1318 mutex_enter(QLOCK(q)); 1319 1320 SET_LASTQTAIL(q, 0); 1321 CLR_QSTATE(q, QTAILBUSY); 1322 1323 mutex_exit(QLOCK(q)); 1324 1325 fail: 1326 1327 /* 1328 * return what should be returned 1329 * the aio is returned in _rdc_write after status is gathered. 1330 */ 1331 1332 if (qbuf) 1333 qbuf->sb_vec = 0; 1334 (void) nsc_free_buf(qbuf); 1335 1336 if (aio->handle) 1337 (void) nsc_free_buf(aio->handle); 1338 1339 _rdc_rlse_diskq(group); 1340 DTRACE_PROBE(rdc_diskq_rlse); 1341 1342 /* free the iohdr and the vecs */ 1343 1344 if (iohdr) 1345 kmem_free(iohdr, sizeof (*iohdr)); 1346 if (vec) 1347 kmem_free(vec, sizeof (*vec) * numvecs); 1348 1349 /* if no flusher running, start one */ 1350 if ((!krdc->group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING)) 1351 (void) rdc_writer(krdc->index); 1352 1353 return (rc); 1354 } 1355 1356 /* 1357 * place this on the pending list of io_hdr's out for flushing 1358 */ 1359 void 1360 rdc_add_iohdr(io_hdr *header, rdc_group_t *group) 1361 { 1362 disk_queue *q = NULL; 1363 #ifdef DEBUG 1364 io_hdr *p; 1365 #endif 1366 1367 q = &group->diskq; 1368 1369 /* paranoia */ 1370 header->dat.next = NULL; 1371 1372 mutex_enter(QLOCK(q)); 1373 #ifdef DEBUG /* AAAH! double flush!? */ 1374 p = q->iohdrs; 1375 while (p) { 1376 if (p->dat.qpos == header->dat.qpos) { 1377 cmn_err(CE_WARN, "!ADDING DUPLICATE HEADER %" NSC_SZFMT, 1378 p->dat.qpos); 1379 kmem_free(header, sizeof (*header)); 1380 mutex_exit(QLOCK(q)); 1381 return; 1382 } 1383 p = p->dat.next; 1384 } 1385 #endif 1386 if (q->iohdrs == NULL) { 1387 q->iohdrs = q->hdr_last = header; 1388 q->hdrcnt = 1; 1389 mutex_exit(QLOCK(q)); 1390 return; 1391 } 1392 1393 q->hdr_last->dat.next = header; 1394 q->hdr_last = header; 1395 q->hdrcnt++; 1396 mutex_exit(QLOCK(q)); 1397 return; 1398 1399 } 1400 1401 /* 1402 * mark an io header as flushed. If it is the qhead, 1403 * then update the qpointers 1404 * free the io_hdrs 1405 * called after the bitmap is cleared by flusher 1406 */ 1407 void 1408 rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_size_t qpos) 1409 { 1410 rdc_group_t *group = krdc->group; 1411 disk_queue *q = NULL; 1412 io_hdr *hp = NULL; 1413 io_hdr *p = NULL; 1414 int found = 0; 1415 int cnt = 0; 1416 1417 #ifndef NSC_MULTI_TERABYTE 1418 ASSERT(qpos >= 0); /* assertion to validate change for 64bit */ 1419 if (qpos < 0) /* not a diskq offset */ 1420 return; 1421 #endif 1422 1423 q = &group->diskq; 1424 mutex_enter(QLOCK(q)); 1425 1426 hp = p = q->iohdrs; 1427 1428 /* find outstanding io_hdr */ 1429 while (hp) { 1430 if (hp->dat.qpos == qpos) { 1431 found++; 1432 break; 1433 } 1434 cnt++; 1435 p = hp; 1436 hp = hp->dat.next; 1437 } 1438 1439 if (!found) { 1440 if (RDC_BETWEEN(QHEAD(q), QNXTIO(q), qpos)) { 1441 #ifdef DEBUG 1442 cmn_err(CE_WARN, "!iohdr already cleared? " 1443 "qpos %" NSC_SZFMT " cnt %d ", qpos, cnt); 1444 cmn_err(CE_WARN, "!Qinfo: " QDISPLAY(q)); 1445 #endif 1446 mutex_exit(QLOCK(q)); 1447 return; 1448 } 1449 mutex_exit(QLOCK(q)); 1450 return; 1451 } 1452 1453 /* mark it as flushed */ 1454 hp->dat.iostatus = RDC_IOHDR_DONE; 1455 1456 /* 1457 * if it is the head pointer, travel the list updating the queue 1458 * pointers until the next unflushed is reached, freeing on the way. 1459 */ 1460 while (hp && (hp->dat.qpos == QHEAD(q)) && 1461 (hp->dat.iostatus == RDC_IOHDR_DONE)) { 1462 #ifdef DEBUG_FLUSHER_UBERNOISE 1463 cmn_err(CE_NOTE, "!clr_iohdr info: magic %x type %d pos %d" 1464 " qpos %d hpos %d len %d flag 0x%x iostatus %x setid %d", 1465 hp->dat.magic, hp->dat.type, hp->dat.pos, hp->dat.qpos, 1466 hp->dat.hpos, hp->dat.len, hp->dat.flag, 1467 hp->dat.iostatus, hp->dat.setid); 1468 #endif 1469 if (hp->dat.flag & RDC_NULL_BUF) { 1470 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr))); 1471 } else { 1472 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)) + hp->dat.len); 1473 DEC_QBLOCKS(q, hp->dat.len); 1474 } 1475 1476 DEC_QNITEMS(q, 1); 1477 1478 if (QHEADSHLDWRAP(q)) { /* simple enough */ 1479 #ifdef DEBUG_DISKQWRAP 1480 cmn_err(CE_NOTE, "!wrapping Q head: " QDISPLAY(q)); 1481 #endif 1482 /*LINTED*/ 1483 WRAPQHEAD(q); 1484 } 1485 1486 /* get rid of the iohdr */ 1487 if (hp == q->iohdrs) { 1488 q->iohdrs = hp->dat.next; 1489 kmem_free(hp, sizeof (*hp)); 1490 hp = q->iohdrs; 1491 } else { 1492 if (hp == q->hdr_last) 1493 q->hdr_last = p; 1494 p->dat.next = hp->dat.next; 1495 kmem_free(hp, sizeof (*hp)); 1496 hp = p->dat.next; 1497 } 1498 q->hdrcnt--; 1499 } 1500 1501 if (QEMPTY(q) && !IS_QSTATE(q, RDC_QFULL) && 1502 !(IS_QSTATE(q, RDC_QDISABLEPEND))) { 1503 #ifdef DEBUG_FLUSHER_UBERNOISE 1504 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 1505 cmn_err(CE_NOTE, "!clr_iohdr: diskq %s empty, " 1506 "resetting defaults", urdc->disk_queue); 1507 #endif 1508 1509 rdc_init_diskq_header(group, &q->disk_hdr); 1510 SET_QNXTIO(q, QHEAD(q)); 1511 } 1512 1513 /* wakeup any blocked enqueue threads */ 1514 cv_broadcast(&q->qfullcv); 1515 mutex_exit(QLOCK(q)); 1516 } 1517 1518 /* 1519 * put in whatever useful checks we can on the io header 1520 */ 1521 int 1522 rdc_iohdr_ok(io_hdr *hdr) 1523 { 1524 if (hdr->dat.magic != RDC_IOHDR_MAGIC) 1525 goto bad; 1526 return (1); 1527 bad: 1528 1529 #ifdef DEBUG 1530 cmn_err(CE_WARN, "!Bad io header magic %x type %d pos %" NSC_SZFMT 1531 " hpos %" NSC_SZFMT " qpos %" NSC_SZFMT " len %" NSC_SZFMT 1532 " flag %d iostatus %d setid %d", hdr->dat.magic, 1533 hdr->dat.type, hdr->dat.pos, hdr->dat.hpos, hdr->dat.qpos, 1534 hdr->dat.len, hdr->dat.flag, hdr->dat.iostatus, hdr->dat.setid); 1535 #else 1536 cmn_err(CE_WARN, "!Bad io header retrieved"); 1537 #endif 1538 return (0); 1539 } 1540 1541 /* 1542 * rdc_netqueue_insert() 1543 * add an item to a netqueue. No locks necessary as it should only 1544 * be used in a single threaded manor. If that changes, then 1545 * a lock or assertion should be done here 1546 */ 1547 void 1548 rdc_netqueue_insert(rdc_aio_t *aio, net_queue *q) 1549 { 1550 rdc_k_info_t *krdc = &rdc_k_info[aio->index]; 1551 1552 /* paranoid check for bit set */ 1553 RDC_CHECK_BIT(krdc, aio->pos, aio->len); 1554 1555 if (q->net_qhead == NULL) { 1556 q->net_qhead = q->net_qtail = aio; 1557 1558 } else { 1559 q->net_qtail->next = aio; 1560 q->net_qtail = aio; 1561 } 1562 q->blocks += aio->len; 1563 q->nitems++; 1564 1565 if (q->nitems > q->nitems_hwm) { 1566 q->nitems_hwm = q->nitems; 1567 } 1568 if (q->blocks > q->blocks_hwm) { 1569 q->nitems_hwm = q->blocks; 1570 } 1571 } 1572 1573 /* 1574 * rdc_fill_aio(aio, hdr) 1575 * take the pertinent info from an io_hdr and stick it in 1576 * an aio, including seq number, abuf. 1577 */ 1578 void 1579 rdc_fill_aio(rdc_group_t *grp, rdc_aio_t *aio, io_hdr *hdr, nsc_buf_t *abuf) 1580 { 1581 if (hdr->dat.flag & RDC_NULL_BUF) { 1582 aio->handle = NULL; 1583 } else { 1584 aio->handle = abuf; 1585 } 1586 aio->qhandle = abuf; 1587 aio->pos = hdr->dat.pos; 1588 aio->qpos = hdr->dat.qpos; 1589 aio->len = hdr->dat.len; 1590 aio->flag = hdr->dat.flag; 1591 if ((aio->index = rdc_setid2idx(hdr->dat.setid)) < 0) 1592 return; 1593 mutex_enter(&grp->diskq.disk_qlock); 1594 if (grp->ra_queue.qfflags & RDC_QFILLSLEEP) { 1595 mutex_exit(&grp->diskq.disk_qlock); 1596 aio->seq = RDC_NOSEQ; 1597 return; 1598 } 1599 if (abuf && aio->qhandle) { 1600 abuf->sb_user++; 1601 } 1602 aio->seq = grp->seq++; 1603 if (grp->seq < aio->seq) 1604 grp->seq = RDC_NEWSEQ + 1; 1605 mutex_exit(&grp->diskq.disk_qlock); 1606 hdr->dat.iostatus = aio->seq; 1607 1608 } 1609 1610 #ifdef DEBUG 1611 int maxaios_perbuf = 0; 1612 int midaios_perbuf = 0; 1613 int aveaios_perbuf = 0; 1614 int totaios_perbuf = 0; 1615 int buf2qcalls = 0; 1616 1617 void 1618 calc_perbuf(int items) 1619 { 1620 if (totaios_perbuf < 0) { 1621 maxaios_perbuf = 0; 1622 midaios_perbuf = 0; 1623 aveaios_perbuf = 0; 1624 totaios_perbuf = 0; 1625 buf2qcalls = 0; 1626 } 1627 1628 if (items > maxaios_perbuf) 1629 maxaios_perbuf = items; 1630 midaios_perbuf = maxaios_perbuf / 2; 1631 totaios_perbuf += items; 1632 aveaios_perbuf = totaios_perbuf / buf2qcalls; 1633 } 1634 #endif 1635 1636 /* 1637 * rdc_discard_tmpq() 1638 * free up the passed temporary queue 1639 * NOTE: no cv's or mutexes have been initialized 1640 */ 1641 void 1642 rdc_discard_tmpq(net_queue *q) 1643 { 1644 rdc_aio_t *aio; 1645 1646 if (q == NULL) 1647 return; 1648 1649 while (q->net_qhead) { 1650 aio = q->net_qhead; 1651 q->net_qhead = q->net_qhead->next; 1652 if (aio->qhandle) { 1653 aio->qhandle->sb_user--; 1654 if (aio->qhandle->sb_user == 0) { 1655 rdc_fixlen(aio); 1656 (void) nsc_free_buf(aio->qhandle); 1657 } 1658 } 1659 kmem_free(aio, sizeof (*aio)); 1660 q->nitems--; 1661 } 1662 kmem_free(q, sizeof (*q)); 1663 1664 } 1665 1666 /* 1667 * rdc_diskq_buf2queue() 1668 * take a chunk of the diskq, parse it and assemble 1669 * a chain of rdc_aio_t's. 1670 * updates QNXTIO() 1671 */ 1672 net_queue * 1673 rdc_diskq_buf2queue(rdc_group_t *grp, nsc_buf_t **abuf, int index) 1674 { 1675 rdc_aio_t *aio = NULL; 1676 nsc_vec_t *vecp = NULL; 1677 uchar_t *vaddr = NULL; 1678 uchar_t *ioaddr = NULL; 1679 net_queue *netq = NULL; 1680 io_hdr *hdr = NULL; 1681 nsc_buf_t *buf = *abuf; 1682 rdc_u_info_t *urdc = &rdc_u_info[index]; 1683 rdc_k_info_t *krdc = &rdc_k_info[index]; 1684 disk_queue *dq = &grp->diskq; 1685 net_queue *nq = &grp->ra_queue; 1686 int nullbuf = 0; 1687 nsc_off_t endobuf; 1688 nsc_off_t bufoff; 1689 int vlen; 1690 nsc_off_t fpos; 1691 long bufcnt = 0; 1692 int nullblocks = 0; 1693 int fail = 1; 1694 1695 if (buf == NULL) 1696 return (NULL); 1697 1698 netq = kmem_zalloc(sizeof (*netq), KM_NOSLEEP); 1699 if (netq == NULL) { 1700 cmn_err(CE_WARN, "!SNDR: unable to allocate net queue"); 1701 return (NULL); 1702 } 1703 1704 vecp = buf->sb_vec; 1705 vlen = vecp->sv_len; 1706 vaddr = vecp->sv_addr; 1707 bufoff = buf->sb_pos; 1708 endobuf = bufoff + buf->sb_len; 1709 1710 #ifdef DEBUG_FLUSHER_UBERNOISE 1711 cmn_err(CE_WARN, "!BUFFOFFENTER %d", bufoff); 1712 #endif 1713 /* CONSTCOND */ 1714 while (1) { 1715 if (IS_STATE(urdc, RDC_LOGGING) || 1716 (nq->qfflags & RDC_QFILLSLEEP)) { 1717 fail = 0; 1718 goto fail; 1719 } 1720 #ifdef DEBUG_FLUSHER_UBERNOISE 1721 cmn_err(CE_WARN, "!BUFFOFF_0 %d", bufoff); 1722 #endif 1723 1724 if ((vaddr == NULL) || (vlen == 0)) 1725 break; 1726 1727 if (vlen <= 0) { 1728 vecp++; 1729 vaddr = vecp->sv_addr; 1730 vlen = vecp->sv_len; 1731 if (vaddr == NULL) 1732 break; 1733 } 1734 1735 /* get the iohdr information */ 1736 1737 hdr = kmem_zalloc(sizeof (*hdr), KM_NOSLEEP); 1738 if (hdr == NULL) { 1739 cmn_err(CE_WARN, 1740 "!SNDR: unable to alocate net queue header"); 1741 goto fail; 1742 } 1743 1744 ioaddr = (uchar_t *)hdr; 1745 1746 bcopy(vaddr, ioaddr, sizeof (*hdr)); 1747 1748 if (!rdc_iohdr_ok(hdr)) { 1749 cmn_err(CE_WARN, 1750 "!unable to retrieve i/o data from queue %s " 1751 "at offset %" NSC_SZFMT " bp: %" NSC_SZFMT " bl: %" 1752 NSC_SZFMT, urdc->disk_queue, 1753 bufoff, buf->sb_pos, buf->sb_len); 1754 #ifdef DEBUG_DISKQ 1755 cmn_err(CE_WARN, "!FAILING QUEUE state: %x", 1756 rdc_get_vflags(urdc)); 1757 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(dq)); 1758 cmn_err(CE_WARN, "!VADDR %p, IOADDR %p", vaddr, ioaddr); 1759 cmn_err(CE_WARN, "!BUF %p", buf); 1760 #endif 1761 cmn_err(CE_WARN, "!qinfo: " QDISPLAYND(dq)); 1762 1763 goto fail; 1764 } 1765 1766 nullbuf = hdr->dat.flag & RDC_NULL_BUF; 1767 1768 bufoff += FBA_NUM(sizeof (*hdr)); 1769 1770 /* out of buffer, set nxtio to re read this last hdr */ 1771 if (!nullbuf && ((bufoff + hdr->dat.len) > endobuf)) { 1772 break; 1773 } 1774 1775 bufcnt += FBA_NUM(sizeof (*hdr)); 1776 1777 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP); 1778 if (aio == NULL) { 1779 bufcnt -= FBA_NUM(sizeof (*hdr)); 1780 cmn_err(CE_WARN, "!SNDR: net queue aio alloc failed"); 1781 goto fail; 1782 } 1783 1784 if (!nullbuf) { 1785 /* move to next iohdr in big buf */ 1786 bufoff += hdr->dat.len; 1787 bufcnt += hdr->dat.len; 1788 } 1789 1790 rdc_fill_aio(grp, aio, hdr, buf); 1791 1792 if (aio->index < 0) { 1793 cmn_err(CE_WARN, "!Set id %d not found or no longer " 1794 "enabled, failing disk queue", hdr->dat.setid); 1795 kmem_free(aio, sizeof (*aio)); 1796 goto fail; 1797 } 1798 if (aio->seq == RDC_NOSEQ) { 1799 kmem_free(aio, sizeof (*aio)); 1800 fail = 0; 1801 goto fail; 1802 } 1803 if (aio->handle == NULL) 1804 nullblocks += aio->len; 1805 1806 rdc_add_iohdr(hdr, grp); 1807 hdr = NULL; /* don't accidentally free on break or fail */ 1808 rdc_netqueue_insert(aio, netq); 1809 1810 /* no more buffer, skip the below logic */ 1811 if ((bufoff + FBA_NUM(sizeof (*hdr))) >= endobuf) { 1812 break; 1813 } 1814 1815 fpos = bufoff - buf->sb_pos; 1816 vecp = buf->sb_vec; 1817 for (; fpos >= FBA_NUM(vecp->sv_len); vecp++) 1818 fpos -= FBA_NUM(vecp->sv_len); 1819 vlen = vecp->sv_len - FBA_SIZE(fpos); 1820 vaddr = vecp->sv_addr + FBA_SIZE(fpos); 1821 /* abuf = NULL; */ 1822 1823 } 1824 1825 /* free extraneous header */ 1826 if (hdr) { 1827 kmem_free(hdr, sizeof (*hdr)); 1828 hdr = NULL; 1829 } 1830 1831 /* 1832 * probably won't happen, but if we didn't goto fail, but 1833 * we don't contain anything meaningful.. return NULL 1834 * and let the flusher or the sleep/wakeup routines 1835 * decide 1836 */ 1837 if (netq && netq->nitems == 0) { 1838 kmem_free(netq, sizeof (*netq)); 1839 return (NULL); 1840 } 1841 1842 #ifdef DEBUG 1843 buf2qcalls++; 1844 calc_perbuf(netq->nitems); 1845 #endif 1846 if (IS_STATE(urdc, RDC_LOGGING) || 1847 nq->qfflags & RDC_QFILLSLEEP) { 1848 fail = 0; 1849 goto fail; 1850 } 1851 1852 mutex_enter(QLOCK(dq)); 1853 INC_QNXTIO(dq, bufcnt); 1854 mutex_exit(QLOCK(dq)); 1855 1856 netq->net_qtail->orig_len = nullblocks; /* overload */ 1857 1858 return (netq); 1859 1860 fail: 1861 1862 if (hdr) { 1863 kmem_free(hdr, sizeof (*hdr)); 1864 } 1865 1866 if (netq) { 1867 if (netq->nitems > 0) { 1868 /* the never can happen case ... */ 1869 if ((netq->nitems == 1) && 1870 (netq->net_qhead->handle == NULL)) { 1871 (void) nsc_free_buf(buf); 1872 *abuf = NULL; 1873 } 1874 1875 } 1876 rdc_discard_tmpq(netq); 1877 } 1878 1879 mutex_enter(QLOCK(dq)); 1880 rdc_dump_iohdrs(dq); 1881 mutex_exit(QLOCK(dq)); 1882 1883 if (fail) { /* real failure, not just state change */ 1884 #ifdef DEBUG 1885 cmn_err(CE_WARN, "!rdc_diskq_buf2queue: failing disk queue %s", 1886 urdc->disk_queue); 1887 #endif 1888 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG); 1889 } 1890 1891 return (NULL); 1892 1893 } 1894 1895 /* 1896 * rdc_diskq_unqueue 1897 * remove one chunk from the diskq belonging to 1898 * rdc_k_info[index] 1899 * updates the head and tail pointers in the disk header 1900 * but does not write. The header should be written on ack 1901 * flusher should free whatever.. 1902 */ 1903 rdc_aio_t * 1904 rdc_diskq_unqueue(int index) 1905 { 1906 int rc, rc1, rc2; 1907 nsc_off_t qhead; 1908 int nullhandle = 0; 1909 io_hdr *iohdr; 1910 rdc_aio_t *aio = NULL; 1911 nsc_buf_t *buf = NULL; 1912 nsc_buf_t *abuf = NULL; 1913 rdc_group_t *group = NULL; 1914 disk_queue *q = NULL; 1915 rdc_k_info_t *krdc = &rdc_k_info[index]; 1916 rdc_u_info_t *urdc = &rdc_u_info[index]; 1917 1918 group = krdc->group; 1919 q = &group->diskq; 1920 1921 if (group->diskqfd == NULL) /* we've been disabled */ 1922 return (NULL); 1923 1924 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP); 1925 if (!aio) { 1926 return (NULL); 1927 } 1928 1929 iohdr = kmem_zalloc(sizeof (*iohdr), KM_NOSLEEP); 1930 if (!iohdr) { 1931 kmem_free(aio, sizeof (*aio)); 1932 return (NULL); 1933 } 1934 1935 mutex_enter(QLOCK(q)); 1936 rdc_set_qbusy(q); /* make sure no one disables the queue */ 1937 mutex_exit(QLOCK(q)); 1938 1939 DTRACE_PROBE(rdc_diskq_unq_rsrv); 1940 1941 if (_rdc_rsrv_diskq(group)) { 1942 cmn_err(CE_WARN, "!rdc_unqueue: %s reserve failed", 1943 urdc->disk_queue); 1944 goto fail; 1945 } 1946 1947 mutex_enter(QHEADLOCK(q)); 1948 mutex_enter(QLOCK(q)); 1949 1950 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || IS_STATE(urdc, RDC_LOGGING)) { 1951 rdc_clr_qbusy(q); 1952 mutex_exit(QLOCK(q)); 1953 mutex_exit(QHEADLOCK(q)); 1954 kmem_free(aio, sizeof (*aio)); 1955 kmem_free(iohdr, sizeof (*iohdr)); 1956 return (NULL); 1957 } 1958 1959 if (QNXTIOSHLDWRAP(q)) { 1960 #ifdef DEBUG_DISKQWRAP 1961 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(q)); 1962 #endif 1963 /*LINTED*/ 1964 WRAPQNXTIO(q); 1965 } 1966 1967 /* read the metainfo at q->nxt_io first */ 1968 if (QNXTIO(q) == QTAIL(q)) { /* empty */ 1969 1970 _rdc_rlse_diskq(group); 1971 if (q->lastio->handle) 1972 (void) nsc_free_buf(q->lastio->handle); 1973 bzero(&(*q->lastio), sizeof (*q->lastio)); 1974 1975 mutex_exit(QHEADLOCK(q)); 1976 rdc_clr_qbusy(q); 1977 mutex_exit(QLOCK(q)); 1978 kmem_free(aio, sizeof (*aio)); 1979 kmem_free(iohdr, sizeof (*iohdr)); 1980 return (NULL); 1981 } 1982 1983 qhead = QNXTIO(q); 1984 1985 /* 1986 * have to drop the lock here, sigh. Cannot block incoming io 1987 * we have to wait until after this read to find out how 1988 * much to increment QNXTIO. Might as well grab the seq then too 1989 */ 1990 1991 while ((qhead == LASTQTAIL(q)) && (IS_QSTATE(q, QTAILBUSY))) { 1992 mutex_exit(QLOCK(q)); 1993 #ifdef DEBUG_DISKQ 1994 cmn_err(CE_NOTE, "!Qtail busy delay lastqtail: %d", qhead); 1995 #endif 1996 delay(5); 1997 mutex_enter(QLOCK(q)); 1998 } 1999 mutex_exit(QLOCK(q)); 2000 2001 DTRACE_PROBE(rdc_diskq_iohdr_read_start); 2002 2003 rc = rdc_ns_io(group->diskqfd, NSC_READ, qhead, 2004 (uchar_t *)iohdr, FBA_SIZE(1)); 2005 2006 DTRACE_PROBE(rdc_diskq_iohdr_read_end); 2007 2008 if (!RDC_SUCCESS(rc) || !rdc_iohdr_ok(iohdr)) { 2009 cmn_err(CE_WARN, "!unable to retrieve i/o data from queue %s" 2010 " at offset %" NSC_SZFMT " rc %d", urdc->disk_queue, 2011 qhead, rc); 2012 #ifdef DEBUG_DISKQ 2013 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q)); 2014 #endif 2015 mutex_exit(QHEADLOCK(q)); 2016 goto fail; 2017 } 2018 2019 /* XXX process buffer here, creating rdc_aio_t's */ 2020 2021 mutex_enter(QLOCK(q)); 2022 /* update the next pointer */ 2023 if (iohdr->dat.flag == RDC_NULL_BUF) { 2024 INC_QNXTIO(q, FBA_LEN(sizeof (io_hdr))); 2025 nullhandle = 1; 2026 } else { 2027 INC_QNXTIO(q, (FBA_LEN(sizeof (io_hdr)) + iohdr->dat.len)); 2028 } 2029 2030 aio->seq = group->seq++; 2031 if (group->seq < aio->seq) 2032 group->seq = RDC_NEWSEQ + 1; 2033 2034 mutex_exit(QLOCK(q)); 2035 mutex_exit(QHEADLOCK(q)); 2036 2037 #ifdef DEBUG_FLUSHER_UBERNOISE 2038 p = &iohdr->dat; 2039 cmn_err(CE_NOTE, "!unqueued iohdr from %d pos: %d len: %d flag: %d " 2040 "iostatus: %d setid: %d time: %d", qhead, p->pos, p->len, 2041 p->flag, p->iostatus, p->setid, p->time); 2042 #endif 2043 2044 if (nullhandle) /* nothing to get from queue */ 2045 goto nullbuf; 2046 2047 /* now that we know how much to get (iohdr.dat.len), get it */ 2048 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_start); 2049 2050 rc = nsc_alloc_buf(group->diskqfd, qhead + 1, iohdr->dat.len, 2051 NSC_NOCACHE | NSC_READ, &buf); 2052 2053 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_end); 2054 2055 /* and get somewhere to keep it for a bit */ 2056 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_start); 2057 2058 rc1 = nsc_alloc_abuf(qhead + 1, iohdr->dat.len, 0, &abuf); 2059 2060 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_end); 2061 2062 if (!RDC_SUCCESS(rc) || !RDC_SUCCESS(rc1)) { /* uh-oh */ 2063 cmn_err(CE_WARN, "!disk queue %s read failure", 2064 urdc->disk_queue); 2065 goto fail; 2066 } 2067 2068 /* move it on over... */ 2069 rc2 = nsc_copy(buf, abuf, qhead + 1, qhead + 1, iohdr->dat.len); 2070 2071 if (!RDC_SUCCESS(rc2)) { 2072 #ifdef DEBUG 2073 cmn_err(CE_WARN, "!nsc_copy failed for diskq unqueue"); 2074 #endif 2075 goto fail; 2076 } 2077 2078 /* let go of the real buf, we've got the abuf */ 2079 (void) nsc_free_buf(buf); 2080 buf = NULL; 2081 2082 aio->handle = abuf; 2083 /* Hack in the original sb_pos */ 2084 aio->handle->sb_pos = iohdr->dat.hpos; 2085 2086 /* skip the RDC_HANDLE_LIMITS check */ 2087 abuf->sb_user |= RDC_DISKQUE; 2088 2089 nullbuf: 2090 if (nullhandle) { 2091 aio->handle = NULL; 2092 } 2093 2094 /* set up the rest of the aio values, seq set above ... */ 2095 aio->pos = iohdr->dat.pos; 2096 aio->qpos = iohdr->dat.qpos; 2097 aio->len = iohdr->dat.len; 2098 aio->flag = iohdr->dat.flag; 2099 aio->index = rdc_setid2idx(iohdr->dat.setid); 2100 if (aio->index < 0) { /* uh-oh */ 2101 #ifdef DEBUG 2102 cmn_err(CE_WARN, "!rdc_diskq_unqueue: index < 0"); 2103 #endif 2104 goto fail; 2105 } 2106 2107 2108 #ifdef DEBUG_FLUSHER_UBERNOISE_STAMP 2109 h = &q->disk_hdr.h; 2110 cmn_err(CE_NOTE, "!stamping diskq header:\n" 2111 "magic: %x\nstate: %d\nhead_offset: %d\n" 2112 "tail_offset: %d\ndisk_size: %d\nnitems: %d\nblocks: %d\n", 2113 h->magic, h->state, h->head_offset, h->tail_offset, 2114 h->disk_size, h->nitems, h->blocks); 2115 #endif 2116 2117 _rdc_rlse_diskq(group); 2118 2119 mutex_enter(QLOCK(q)); 2120 rdc_clr_qbusy(q); 2121 mutex_exit(QLOCK(q)); 2122 2123 DTRACE_PROBE(rdc_diskq_unq_rlse); 2124 2125 iohdr->dat.iostatus = aio->seq; 2126 rdc_add_iohdr(iohdr, group); 2127 2128 #ifdef DEBUG_FLUSHER_UBERNOISE 2129 if (!nullhandle) { 2130 cmn_err(CE_NOTE, "!UNQUEUING, %p" 2131 " contents: %c%c%c%c%c pos: %d len: %d", 2132 (void *)aio->handle, 2133 aio->handle->sb_vec[0].sv_addr[0], 2134 aio->handle->sb_vec[0].sv_addr[1], 2135 aio->handle->sb_vec[0].sv_addr[2], 2136 aio->handle->sb_vec[0].sv_addr[3], 2137 aio->handle->sb_vec[0].sv_addr[4], 2138 aio->handle->sb_pos, aio->handle->sb_len); 2139 } else { 2140 cmn_err(CE_NOTE, "!UNQUEUING, NULL " QDISPLAY(q)); 2141 } 2142 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q)); 2143 #endif 2144 2145 return (aio); 2146 2147 fail: 2148 if (aio) 2149 kmem_free(aio, sizeof (*aio)); 2150 if (iohdr) 2151 kmem_free(iohdr, sizeof (*iohdr)); 2152 if (buf) 2153 (void) nsc_free_buf(buf); 2154 if (abuf) 2155 (void) nsc_free_buf(abuf); 2156 2157 _rdc_rlse_diskq(group); 2158 #ifdef DEBUG 2159 cmn_err(CE_WARN, "!diskq_unqueue: failing diskq"); 2160 #endif 2161 mutex_enter(QLOCK(q)); 2162 rdc_clr_qbusy(q); 2163 mutex_exit(QLOCK(q)); 2164 2165 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG); 2166 2167 return (NULL); 2168 } 2169 2170 int 2171 rdc_diskq_inuse(rdc_set_t *set, char *diskq) 2172 { 2173 rdc_u_info_t *urdc; 2174 char *group; 2175 int index; 2176 2177 group = set->group_name; 2178 2179 ASSERT(MUTEX_HELD(&rdc_conf_lock)); 2180 2181 if ((rdc_lookup_bitmap(diskq) >= 0) || 2182 (rdc_lookup_configured(diskq) >= 0)) { 2183 return (1); 2184 } 2185 for (index = 0; index < rdc_max_sets; index++) { 2186 urdc = &rdc_u_info[index]; 2187 2188 if (!IS_ENABLED(urdc)) 2189 continue; 2190 2191 /* same diskq different group */ 2192 if ((strcmp(urdc->disk_queue, diskq) == 0) && 2193 (urdc->group_name[0] == '\0' || 2194 strcmp(urdc->group_name, group))) { 2195 return (1); 2196 } 2197 } 2198 /* last, but not least, lets see if someone is getting really funky */ 2199 if ((strcmp(set->disk_queue, set->primary.file) == 0) || 2200 (strcmp(set->disk_queue, set->primary.bitmap) == 0)) { 2201 return (1); 2202 } 2203 2204 return (0); 2205 2206 } 2207 2208 #ifdef DEBUG 2209 int maxlen = 0; 2210 int avelen = 0; 2211 int totalen = 0; 2212 int lencalls = 0; 2213 2214 void 2215 update_lenstats(int len) 2216 { 2217 if (lencalls == 0) { 2218 lencalls = 1; 2219 avelen = 0; 2220 maxlen = 0; 2221 totalen = 0; 2222 } 2223 2224 if (len > maxlen) 2225 maxlen = len; 2226 totalen += len; 2227 avelen = totalen / lencalls; 2228 } 2229 #endif 2230 2231 /* 2232 * rdc_calc_len() 2233 * returns the size of the diskq that can be read for dequeuing 2234 * always <= RDC_MAX_DISKQREAD 2235 */ 2236 int 2237 rdc_calc_len(rdc_k_info_t *krdc, disk_queue *dq) 2238 { 2239 nsc_size_t len = 0; 2240 2241 ASSERT(MUTEX_HELD(QLOCK(dq))); 2242 2243 /* ---H-----N-----T--- */ 2244 if (QNXTIO(dq) < QTAIL(dq)) { 2245 2246 len = min(RDC_MAX_DISKQREAD, QTAIL(dq) - QNXTIO(dq)); 2247 2248 /* ---T-----H-----N--- */ 2249 } else if (QNXTIO(dq) > QTAIL(dq)) { 2250 if (QWRAP(dq)) { 2251 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq)); 2252 } else { /* should never happen */ 2253 len = min(RDC_MAX_DISKQREAD, QSIZE(dq) - QNXTIO(dq)); 2254 } 2255 } else if (QNXTIO(dq) == QTAIL(dq)) { 2256 if (QWRAP(dq) && !IS_QSTATE(dq, QNXTIOWRAPD)) 2257 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq)); 2258 } 2259 2260 len = min(len, krdc->maxfbas); 2261 2262 #ifdef DEBUG 2263 lencalls++; 2264 update_lenstats(len); 2265 #endif 2266 2267 return ((int)len); 2268 } 2269 2270 /* 2271 * lie a little if we can, so we don't get tied up in 2272 * _nsc_wait_dbuf() on the next read. sb_len MUST be 2273 * restored before nsc_free_buf() however, or we will 2274 * be looking at memory leak city.. 2275 * so update the entire queue with the info as well 2276 * and the one that ends up freeing it, can fix the len 2277 * IMPORTANT: This assumes that we are not cached, in 2278 * 3.2 caching was turned off for data volumes, if that 2279 * changes, then this must too 2280 */ 2281 void 2282 rdc_trim_buf(nsc_buf_t *buf, net_queue *q) 2283 { 2284 rdc_aio_t *p; 2285 int len; 2286 2287 if (buf == NULL || q == NULL) 2288 return; 2289 2290 if (q && (buf->sb_len > 2291 (q->blocks + q->nitems - q->net_qtail->orig_len))) { 2292 len = buf->sb_len; 2293 buf->sb_len = (q->blocks + q->nitems - q->net_qtail->orig_len); 2294 } 2295 2296 p = q->net_qhead; 2297 do { 2298 p->orig_len = len; 2299 p = p->next; 2300 2301 } while (p); 2302 2303 } 2304 2305 /* 2306 * rdc_read_diskq_buf() 2307 * read a large as possible chunk of the diskq into a nsc_buf_t 2308 * and convert it to a net_queue of rdc_aio_t's to be appended 2309 * to the group's netqueue 2310 */ 2311 net_queue * 2312 rdc_read_diskq_buf(int index) 2313 { 2314 nsc_buf_t *buf = NULL; 2315 net_queue *tmpnq = NULL; 2316 disk_queue *dq = NULL; 2317 rdc_k_info_t *krdc = &rdc_k_info[index]; 2318 rdc_u_info_t *urdc = &rdc_u_info[index]; 2319 rdc_group_t *group = krdc->group; 2320 net_queue *nq = &group->ra_queue; 2321 int len = 0; 2322 int rc; 2323 int fail = 0; 2324 int offset = 0; 2325 2326 if (group == NULL || group->diskqfd == NULL) { 2327 DTRACE_PROBE(rdc_read_diskq_buf_bail1); 2328 return (NULL); 2329 } 2330 2331 dq = &group->diskq; 2332 2333 mutex_enter(QLOCK(dq)); 2334 rdc_set_qbusy(dq); /* prevent disables on the queue */ 2335 mutex_exit(QLOCK(dq)); 2336 2337 if (_rdc_rsrv_diskq(group)) { 2338 cmn_err(CE_WARN, "!rdc_readdiskqbuf: %s reserve failed", 2339 urdc->disk_queue); 2340 mutex_enter(QLOCK(dq)); 2341 rdc_clr_qbusy(dq); /* prevent disables on the queue */ 2342 mutex_exit(QLOCK(dq)); 2343 return (NULL); 2344 } 2345 2346 mutex_enter(QHEADLOCK(dq)); 2347 mutex_enter(QLOCK(dq)); 2348 2349 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || 2350 IS_STATE(urdc, RDC_LOGGING) || 2351 (nq->qfflags & RDC_QFILLSLEEP)) { 2352 mutex_exit(QLOCK(dq)); 2353 mutex_exit(QHEADLOCK(dq)); 2354 DTRACE_PROBE(rdc_read_diskq_buf_bail2); 2355 goto done; 2356 } 2357 2358 /* 2359 * real corner case here, we need to let the flusher wrap first. 2360 * we've gotten too far ahead, so just delay and try again 2361 */ 2362 if (IS_QSTATE(dq, QNXTIOWRAPD) && AUXQWRAP(dq)) { 2363 mutex_exit(QLOCK(dq)); 2364 mutex_exit(QHEADLOCK(dq)); 2365 goto done; 2366 } 2367 2368 if (QNXTIOSHLDWRAP(dq)) { 2369 #ifdef DEBUG_DISKQWRAP 2370 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(dq)); 2371 #endif 2372 /*LINTED*/ 2373 WRAPQNXTIO(dq); 2374 } 2375 2376 /* read the metainfo at q->nxt_io first */ 2377 if (!QNITEMS(dq)) { /* empty */ 2378 2379 if (dq->lastio->handle) 2380 (void) nsc_free_buf(dq->lastio->handle); 2381 bzero(&(*dq->lastio), sizeof (*dq->lastio)); 2382 mutex_exit(QLOCK(dq)); 2383 mutex_exit(QHEADLOCK(dq)); 2384 DTRACE_PROBE(rdc_read_diskq_buf_bail3); 2385 goto done; 2386 } 2387 2388 2389 len = rdc_calc_len(krdc, dq); 2390 2391 if ((len <= 0) || (IS_STATE(urdc, RDC_LOGGING)) || 2392 (IS_STATE(urdc, RDC_DISKQ_FAILED)) || 2393 (nq->qfflags & RDC_QFILLSLEEP)) { 2394 mutex_exit(QLOCK(dq)); 2395 mutex_exit(QHEADLOCK(dq)); 2396 /* 2397 * a write could be trying to get on the queue, or if 2398 * the queue is really really small, a complete image 2399 * of it could be on the net queue waiting for flush. 2400 * the latter being a fairly stupid scenario and a gross 2401 * misconfiguration.. but what the heck, why make the thread 2402 * thrash around.. just pause a little here. 2403 */ 2404 if (len <= 0) 2405 delay(50); 2406 2407 DTRACE_PROBE3(rdc_read_diskq_buf_bail4, int, len, 2408 int, rdc_get_vflags(urdc), int, nq->qfflags); 2409 2410 goto done; 2411 } 2412 2413 DTRACE_PROBE2(rdc_calc_len, int, len, int, (int)QNXTIO(dq)); 2414 2415 #ifdef DEBUG_FLUSHER_UBERNOISE 2416 cmn_err(CE_WARN, "!CALC_LEN(%d) h:%d n%d t%d, w%d", 2417 len, QHEAD(dq), QNXTIO(dq), QTAIL(dq), QWRAP(dq)); 2418 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(dq)); 2419 #endif 2420 SET_QCOALBOUNDS(dq, QNXTIO(dq) + len); 2421 2422 while ((LASTQTAIL(dq) > 0) && !QWRAP(dq) && 2423 ((QNXTIO(dq) + len) >= LASTQTAIL(dq)) && 2424 (IS_QSTATE(dq, QTAILBUSY))) { 2425 mutex_exit(QLOCK(dq)); 2426 2427 #ifdef DEBUG_FLUSHER_UBERNOISE 2428 cmn_err(CE_NOTE, "!Qtail busy delay nxtio %d len %d " 2429 "lastqtail: %d", QNXTIO(dq), len, LASTQTAIL(dq)); 2430 #endif 2431 delay(20); 2432 mutex_enter(QLOCK(dq)); 2433 } 2434 2435 offset = QNXTIO(dq); 2436 2437 /* 2438 * one last check to see if we have gone logging, or should. 2439 * we may have released the mutex above, so check again 2440 */ 2441 if ((IS_STATE(urdc, RDC_LOGGING)) || 2442 (IS_STATE(urdc, RDC_DISKQ_FAILED)) || 2443 (nq->qfflags & RDC_QFILLSLEEP)) { 2444 mutex_exit(QLOCK(dq)); 2445 mutex_exit(QHEADLOCK(dq)); 2446 goto done; 2447 } 2448 2449 mutex_exit(QLOCK(dq)); 2450 mutex_exit(QHEADLOCK(dq)); 2451 2452 DTRACE_PROBE2(rdc_buf2q_preread, int, offset, int, len); 2453 2454 rc = nsc_alloc_buf(group->diskqfd, offset, len, 2455 NSC_NOCACHE | NSC_READ, &buf); 2456 2457 if (!RDC_SUCCESS(rc)) { 2458 cmn_err(CE_WARN, "!disk queue %s read failure pos %" NSC_SZFMT 2459 " len %d", urdc->disk_queue, QNXTIO(dq), len); 2460 fail++; 2461 buf = NULL; 2462 DTRACE_PROBE(rdc_read_diskq_buf_bail5); 2463 goto done; 2464 } 2465 2466 DTRACE_PROBE2(rdc_buf2q_postread, int, offset, nsc_size_t, buf->sb_len); 2467 2468 /* 2469 * convert buf to a net_queue. buf2queue will 2470 * update the QNXTIO pointer for us, based on 2471 * the last readable queue item 2472 */ 2473 tmpnq = rdc_diskq_buf2queue(group, &buf, index); 2474 2475 #ifdef DEBUG_FLUSHER_UBERNOISE 2476 cmn_err(CE_NOTE, "!QBUF p: %d l: %d p+l: %d users: %d qblocks: %d ", 2477 "qitems: %d WASTED: %d", buf->sb_pos, buf->sb_len, 2478 buf->sb_pos+buf->sb_len, buf->sb_user, tmpnq?tmpnq->blocks:-1, 2479 tmpnq?tmpnq->nitems:-1, 2480 tmpnq?((buf->sb_len-tmpnq->nitems) - tmpnq->blocks):-1); 2481 #endif 2482 2483 DTRACE_PROBE3(rdc_buf2que_returned, net_queue *, tmpnq?tmpnq:0, 2484 uint64_t, tmpnq?tmpnq->nitems:0, 2485 uint_t, tmpnq?tmpnq->net_qhead->seq:0); 2486 done: 2487 2488 /* we don't need to retain the buf */ 2489 if (tmpnq == NULL) 2490 if (buf) { 2491 (void) nsc_free_buf(buf); 2492 buf = NULL; 2493 } 2494 2495 rdc_trim_buf(buf, tmpnq); 2496 2497 mutex_enter(QLOCK(dq)); 2498 rdc_clr_qbusy(dq); 2499 mutex_exit(QLOCK(dq)); 2500 2501 _rdc_rlse_diskq(group); 2502 2503 if (fail) { 2504 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG); 2505 tmpnq = NULL; 2506 } 2507 2508 return (tmpnq); 2509 } 2510 2511 /* 2512 * rdc_dequeue() 2513 * removes the head of the memory queue 2514 */ 2515 rdc_aio_t * 2516 rdc_dequeue(rdc_k_info_t *krdc, int *rc) 2517 { 2518 net_queue *q = &krdc->group->ra_queue; 2519 disk_queue *dq = &krdc->group->diskq; 2520 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 2521 rdc_aio_t *aio; 2522 2523 *rc = 0; 2524 2525 if (q == NULL) 2526 return (NULL); 2527 2528 mutex_enter(&q->net_qlock); 2529 2530 aio = q->net_qhead; 2531 2532 if (aio == NULL) { 2533 #ifdef DEBUG 2534 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) { 2535 cmn_err(CE_PANIC, 2536 "rdc_dequeue(1): q %p, q blocks %" NSC_SZFMT 2537 " , nitems %" NSC_SZFMT ", qhead %p qtail %p", 2538 (void *) q, q->blocks, q->nitems, 2539 (void *) aio, (void *) q->net_qtail); 2540 } 2541 #endif 2542 2543 mutex_exit(&q->net_qlock); 2544 2545 if ((!IS_STATE(urdc, RDC_LOGGING)) && 2546 (!(q->qfflags & RDC_QFILLSLEEP)) && 2547 (!IS_STATE(urdc, RDC_SYNCING)) && (QNITEMS(dq) > 0)) { 2548 *rc = EAGAIN; 2549 } 2550 2551 goto done; 2552 } 2553 2554 /* aio remove from q */ 2555 2556 q->net_qhead = aio->next; 2557 aio->next = NULL; 2558 2559 if (q->net_qtail == aio) 2560 q->net_qtail = q->net_qhead; 2561 2562 q->blocks -= aio->len; 2563 q->nitems--; 2564 2565 #ifdef DEBUG 2566 if (q->net_qhead == NULL) { 2567 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) { 2568 cmn_err(CE_PANIC, "rdc_dequeue(2): q %p, q blocks %" 2569 NSC_SZFMT " nitems %" NSC_SZFMT 2570 " , qhead %p qtail %p", 2571 (void *) q, q->blocks, q->nitems, 2572 (void *) q->net_qhead, (void *) q->net_qtail); 2573 } 2574 } 2575 #endif 2576 mutex_exit(&q->net_qlock); 2577 done: 2578 2579 mutex_enter(&q->net_qlock); 2580 2581 if (rdc_qfill_shldwakeup(krdc)) 2582 cv_broadcast(&q->qfcv); 2583 2584 /* 2585 * clear EAGAIN if 2586 * logging or q filler thread is sleeping or stopping altogether 2587 * or if q filler thread is dead already 2588 * or if syncing, this will return a null aio, with no error code set 2589 * telling the flusher to die 2590 */ 2591 if (*rc == EAGAIN) { 2592 if (IS_STATE(urdc, RDC_LOGGING) || 2593 (q->qfflags & (RDC_QFILLSLEEP | RDC_QFILLSTOP)) || 2594 (IS_QSTATE(dq, (RDC_QDISABLEPEND | RDC_STOPPINGFLUSH))) || 2595 (q->qfill_sleeping == RDC_QFILL_DEAD) || 2596 (IS_STATE(urdc, RDC_SYNCING))) 2597 *rc = 0; 2598 } 2599 2600 mutex_exit(&q->net_qlock); 2601 2602 return (aio); 2603 2604 } 2605 2606 /* 2607 * rdc_qfill_shldsleep() 2608 * returns 1 if the qfilling code should cv_wait() 0 if not. 2609 * reasons for going into cv_wait(); 2610 * there is nothing in the diskq to flush to mem. 2611 * the memory queue has gotten too big and needs more flushing attn. 2612 */ 2613 int 2614 rdc_qfill_shldsleep(rdc_k_info_t *krdc) 2615 { 2616 net_queue *nq = &krdc->group->ra_queue; 2617 disk_queue *dq = &krdc->group->diskq; 2618 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 2619 2620 ASSERT(MUTEX_HELD(&nq->net_qlock)); 2621 2622 if (!RDC_IS_DISKQ(krdc->group)) 2623 return (1); 2624 2625 if (nq->qfflags & RDC_QFILLSLEEP) { 2626 #ifdef DEBUG_DISKQ_NOISY 2627 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QFILLSLEEP idx: %d", 2628 krdc->index); 2629 #endif 2630 return (1); 2631 } 2632 2633 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) { 2634 #ifdef DEBUG_DISKQ_NOISY 2635 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: Sync|Log (0x%x)" 2636 " idx: %d", rdc_get_vflags(urdc), urdc->index); 2637 #endif 2638 return (1); 2639 } 2640 2641 mutex_enter(QLOCK(dq)); 2642 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) { 2643 #ifdef DEBUG_DISKQ_NOISY 2644 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QEMPTY"); 2645 #endif 2646 mutex_exit(QLOCK(dq)); 2647 return (1); 2648 } 2649 mutex_exit(QLOCK(dq)); 2650 2651 if (nq->blocks >= RDC_MAX_QBLOCKS) { 2652 nq->hwmhit = 1; 2653 /* stuck flushers ? */ 2654 #ifdef DEBUG_DISKQ_NOISY 2655 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: memq full:" 2656 " seq: %d seqack %d", krdc->group->seq, 2657 krdc->group->seqack); 2658 #endif 2659 return (1); 2660 } 2661 2662 return (0); 2663 } 2664 2665 /* 2666 * rdc_join_netqueues(a, b) 2667 * appends queue b to queue a updating all the queue info 2668 * as it is assumed queue a is the important one, 2669 * it's mutex must be held. no one can add to queue b 2670 */ 2671 void 2672 rdc_join_netqueues(net_queue *q, net_queue *tmpq) 2673 { 2674 ASSERT(MUTEX_HELD(&q->net_qlock)); 2675 2676 if (q->net_qhead == NULL) { /* empty */ 2677 #ifdef DEBUG 2678 if (q->blocks != 0 || q->nitems != 0) { 2679 cmn_err(CE_PANIC, "rdc filler: q %p, qhead 0, " 2680 " q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT, 2681 (void *) q, q->blocks, q->nitems); 2682 } 2683 #endif 2684 q->net_qhead = tmpq->net_qhead; 2685 q->net_qtail = tmpq->net_qtail; 2686 q->nitems = tmpq->nitems; 2687 q->blocks = tmpq->blocks; 2688 } else { 2689 q->net_qtail->next = tmpq->net_qhead; 2690 q->net_qtail = tmpq->net_qtail; 2691 q->nitems += tmpq->nitems; 2692 q->blocks += tmpq->blocks; 2693 } 2694 2695 if (q->nitems > q->nitems_hwm) { 2696 q->nitems_hwm = q->nitems; 2697 } 2698 2699 if (q->blocks > q->blocks_hwm) { 2700 q->blocks_hwm = q->blocks; 2701 } 2702 } 2703 2704 /* 2705 * rdc_qfiller_thr() single thread that moves 2706 * data from the diskq to a memory queue for 2707 * the flusher to pick up. 2708 */ 2709 void 2710 rdc_qfiller_thr(rdc_k_info_t *krdc) 2711 { 2712 rdc_group_t *grp = krdc->group; 2713 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 2714 net_queue *q = &grp->ra_queue; 2715 net_queue *tmpq = NULL; 2716 int index = krdc->index; 2717 2718 q->qfill_sleeping = RDC_QFILL_AWAKE; 2719 while (!(q->qfflags & RDC_QFILLSTOP)) { 2720 if (!RDC_IS_DISKQ(grp) || 2721 IS_STATE(urdc, RDC_LOGGING) || 2722 IS_STATE(urdc, RDC_DISKQ_FAILED) || 2723 (q->qfflags & RDC_QFILLSLEEP)) { 2724 goto nulltmpq; 2725 } 2726 2727 DTRACE_PROBE(qfiller_top); 2728 tmpq = rdc_read_diskq_buf(index); 2729 2730 if (tmpq == NULL) 2731 goto nulltmpq; 2732 2733 if ((q->qfflags & RDC_QFILLSLEEP) || 2734 IS_STATE(urdc, RDC_LOGGING)) { 2735 rdc_discard_tmpq(tmpq); 2736 goto nulltmpq; 2737 } 2738 2739 mutex_enter(&q->net_qlock); 2740 2741 /* race with log, redundant yet paranoid */ 2742 if ((q->qfflags & RDC_QFILLSLEEP) || 2743 IS_STATE(urdc, RDC_LOGGING)) { 2744 rdc_discard_tmpq(tmpq); 2745 mutex_exit(&q->net_qlock); 2746 goto nulltmpq; 2747 } 2748 2749 2750 rdc_join_netqueues(q, tmpq); 2751 kmem_free(tmpq, sizeof (*tmpq)); 2752 tmpq = NULL; 2753 2754 mutex_exit(&q->net_qlock); 2755 nulltmpq: 2756 /* 2757 * sleep for a while if we can. 2758 * the enqueuing or flushing code will 2759 * wake us if if necessary. 2760 */ 2761 mutex_enter(&q->net_qlock); 2762 while (rdc_qfill_shldsleep(krdc)) { 2763 q->qfill_sleeping = RDC_QFILL_ASLEEP; 2764 DTRACE_PROBE(qfiller_sleep); 2765 cv_wait(&q->qfcv, &q->net_qlock); 2766 DTRACE_PROBE(qfiller_wakeup); 2767 q->qfill_sleeping = RDC_QFILL_AWAKE; 2768 if (q->qfflags & RDC_QFILLSTOP) { 2769 #ifdef DEBUG_DISKQ 2770 cmn_err(CE_NOTE, 2771 "!rdc_qfiller_thr: recieved kill signal"); 2772 #endif 2773 mutex_exit(&q->net_qlock); 2774 goto done; 2775 } 2776 } 2777 mutex_exit(&q->net_qlock); 2778 2779 DTRACE_PROBE(qfiller_bottom); 2780 } 2781 done: 2782 DTRACE_PROBE(qfiller_done); 2783 q->qfill_sleeping = RDC_QFILL_DEAD; /* the big sleep */ 2784 2785 #ifdef DEBUG 2786 cmn_err(CE_NOTE, "!rdc_qfiller_thr stopping"); 2787 #endif 2788 q->qfflags &= ~RDC_QFILLSTOP; 2789 2790 } 2791 2792 int 2793 _rdc_add_diskq(int index, char *diskq) 2794 { 2795 rdc_k_info_t *krdc, *kp; 2796 rdc_u_info_t *urdc, *up; 2797 rdc_group_t *group; 2798 int rc; 2799 2800 krdc = &rdc_k_info[index]; 2801 urdc = &rdc_u_info[index]; 2802 group = krdc->group; 2803 2804 if (!diskq || urdc->disk_queue[0]) { /* how'd that happen? */ 2805 #ifdef DEBUG 2806 cmn_err(CE_WARN, "!NULL diskq in _rdc_add_diskq"); 2807 #endif 2808 rc = -1; 2809 goto fail; 2810 } 2811 2812 /* if the enable fails, this is bzero'ed */ 2813 (void) strncpy(urdc->disk_queue, diskq, NSC_MAXPATH); 2814 group->flags &= ~RDC_MEMQUE; 2815 group->flags |= RDC_DISKQUE; 2816 2817 #ifdef DEBUG 2818 cmn_err(CE_NOTE, "!adding diskq to group %s", urdc->group_name); 2819 #endif 2820 mutex_enter(&rdc_conf_lock); 2821 rc = rdc_enable_diskq(krdc); 2822 mutex_exit(&rdc_conf_lock); 2823 2824 if (rc == RDC_EQNOADD) { 2825 goto fail; 2826 } 2827 2828 RDC_ZERO_BITREF(krdc); 2829 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) { 2830 up = &rdc_u_info[kp->index]; 2831 (void) strncpy(up->disk_queue, diskq, NSC_MAXPATH); 2832 /* size lives in the diskq structure, already set by enable */ 2833 RDC_ZERO_BITREF(kp); 2834 } 2835 2836 fail: 2837 return (rc); 2838 2839 } 2840 2841 /* 2842 * add a diskq to an existing set/group 2843 */ 2844 int 2845 rdc_add_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 2846 { 2847 char *diskq; 2848 int rc; 2849 int index; 2850 rdc_k_info_t *krdc, *this; 2851 rdc_u_info_t *urdc; 2852 rdc_group_t *group; 2853 nsc_size_t vol_size = 0; 2854 nsc_size_t req_size = 0; 2855 2856 mutex_enter(&rdc_conf_lock); 2857 index = rdc_lookup_byname(uparms->rdc_set); 2858 mutex_exit(&rdc_conf_lock); 2859 if (index < 0) { 2860 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file, 2861 uparms->rdc_set->secondary.file); 2862 rc = RDC_EALREADY; 2863 goto failed; 2864 } 2865 urdc = &rdc_u_info[index]; 2866 krdc = &rdc_k_info[index]; 2867 this = &rdc_k_info[index]; 2868 group = krdc->group; 2869 diskq = uparms->rdc_set->disk_queue; 2870 2871 if (!IS_ASYNC(urdc)) { 2872 spcs_s_add(kstatus, RDC_EQWRONGMODE, urdc->primary.intf, 2873 urdc->primary.file, urdc->secondary.intf, 2874 urdc->secondary.file); 2875 rc = RDC_EQNOQUEUE; 2876 goto failed; 2877 } 2878 2879 do { 2880 if (!IS_STATE(urdc, RDC_LOGGING)) { 2881 spcs_s_add(kstatus, RDC_EQNOTLOGGING, 2882 uparms->rdc_set->disk_queue); 2883 rc = RDC_EQNOTLOGGING; 2884 goto failed; 2885 } 2886 /* make sure that we have enough bitmap vol */ 2887 req_size = RDC_BITMAP_FBA + FBA_LEN(krdc->bitmap_size); 2888 req_size += FBA_LEN(krdc->bitmap_size * BITS_IN_BYTE); 2889 2890 rc = _rdc_rsrv_devs(krdc, RDC_BMP, RDC_INTERNAL); 2891 2892 if (!RDC_SUCCESS(rc)) { 2893 cmn_err(CE_WARN, 2894 "!rdc_open_diskq: Bitmap reserve failed"); 2895 spcs_s_add(kstatus, RDC_EBITMAP, 2896 urdc->primary.bitmap); 2897 rc = RDC_EBITMAP; 2898 goto failed; 2899 } 2900 2901 (void) nsc_partsize(krdc->bitmapfd, &vol_size); 2902 2903 _rdc_rlse_devs(krdc, RDC_BMP); 2904 2905 if (vol_size < req_size) { 2906 spcs_s_add(kstatus, RDC_EBITMAP2SMALL, 2907 urdc->primary.bitmap); 2908 rc = RDC_EBITMAP2SMALL; 2909 goto failed; 2910 } 2911 2912 krdc = krdc->group_next; 2913 urdc = &rdc_u_info[krdc->index]; 2914 2915 } while (krdc != this); 2916 2917 if (urdc->disk_queue[0] != '\0') { 2918 spcs_s_add(kstatus, RDC_EQALREADY, urdc->primary.intf, 2919 urdc->primary.file, urdc->secondary.intf, 2920 urdc->secondary.file); 2921 rc = RDC_EQALREADY; 2922 goto failed; 2923 } 2924 2925 if (uparms->options & RDC_OPT_SECONDARY) { /* how'd we get here? */ 2926 spcs_s_add(kstatus, RDC_EQWRONGMODE); 2927 rc = RDC_EQWRONGMODE; 2928 goto failed; 2929 } 2930 2931 mutex_enter(&rdc_conf_lock); 2932 if (rdc_diskq_inuse(uparms->rdc_set, uparms->rdc_set->disk_queue)) { 2933 spcs_s_add(kstatus, RDC_EDISKQINUSE, 2934 uparms->rdc_set->disk_queue); 2935 rc = RDC_EDISKQINUSE; 2936 mutex_exit(&rdc_conf_lock); 2937 goto failed; 2938 } 2939 mutex_exit(&rdc_conf_lock); 2940 2941 rdc_group_enter(krdc); 2942 rc = _rdc_add_diskq(urdc->index, diskq); 2943 if (rc < 0 || rc == RDC_EQNOADD) { 2944 group->flags &= ~RDC_DISKQUE; 2945 group->flags |= RDC_MEMQUE; 2946 spcs_s_add(kstatus, RDC_EQNOADD, uparms->rdc_set->disk_queue); 2947 rc = RDC_EQNOADD; 2948 } 2949 rdc_group_exit(krdc); 2950 failed: 2951 return (rc); 2952 } 2953 2954 int 2955 _rdc_init_diskq(rdc_k_info_t *krdc) 2956 { 2957 rdc_group_t *group = krdc->group; 2958 disk_queue *q = &group->diskq; 2959 2960 rdc_init_diskq_header(group, &group->diskq.disk_hdr); 2961 SET_QNXTIO(q, QHEAD(q)); 2962 2963 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) 2964 goto fail; 2965 2966 return (0); 2967 fail: 2968 return (-1); 2969 } 2970 2971 /* 2972 * inititalize the disk queue. This is a destructive 2973 * operation that will not check for emptiness of the queue. 2974 */ 2975 int 2976 rdc_init_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 2977 { 2978 int rc = 0; 2979 int index; 2980 rdc_k_info_t *krdc, *kp; 2981 rdc_u_info_t *urdc, *up; 2982 rdc_set_t *uset; 2983 rdc_group_t *group; 2984 disk_queue *qp; 2985 2986 uset = uparms->rdc_set; 2987 2988 mutex_enter(&rdc_conf_lock); 2989 index = rdc_lookup_byname(uset); 2990 mutex_exit(&rdc_conf_lock); 2991 if (index < 0) { 2992 spcs_s_add(kstatus, RDC_EALREADY, uset->primary.file, 2993 uset->secondary.file); 2994 rc = RDC_EALREADY; 2995 goto fail; 2996 } 2997 2998 krdc = &rdc_k_info[index]; 2999 urdc = &rdc_u_info[index]; 3000 group = krdc->group; 3001 qp = &group->diskq; 3002 3003 if (!IS_STATE(urdc, RDC_SYNCING) && !IS_STATE(urdc, RDC_LOGGING)) { 3004 spcs_s_add(kstatus, RDC_EQUEISREP, urdc->disk_queue); 3005 rc = RDC_EQUEISREP; 3006 goto fail; 3007 } 3008 3009 /* 3010 * a couple of big "ifs" here. in the first implementation 3011 * neither of these will be possible. This will come into 3012 * play when we persist the queue across reboots 3013 */ 3014 if (!(uparms->options & RDC_OPT_FORCE_QINIT)) { 3015 if (!QEMPTY(qp)) { 3016 if (group->rdc_writer) { 3017 spcs_s_add(kstatus, RDC_EQFLUSHING, 3018 urdc->disk_queue); 3019 rc = RDC_EQFLUSHING; 3020 } else { 3021 spcs_s_add(kstatus, RDC_EQNOTEMPTY, 3022 urdc->disk_queue); 3023 rc = RDC_EQNOTEMPTY; 3024 } 3025 goto fail; 3026 } 3027 } 3028 3029 mutex_enter(QLOCK(qp)); 3030 if (_rdc_init_diskq(krdc) < 0) { 3031 mutex_exit(QLOCK(qp)); 3032 goto fail; 3033 } 3034 rdc_dump_iohdrs(qp); 3035 3036 rdc_group_enter(krdc); 3037 3038 rdc_clr_flags(urdc, RDC_QUEUING); 3039 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) { 3040 up = &rdc_u_info[kp->index]; 3041 rdc_clr_flags(up, RDC_QUEUING); 3042 } 3043 rdc_group_exit(krdc); 3044 3045 mutex_exit(QLOCK(qp)); 3046 3047 return (0); 3048 fail: 3049 /* generic queue failure */ 3050 if (!rc) { 3051 spcs_s_add(kstatus, RDC_EQINITFAIL, urdc->disk_queue); 3052 rc = RDC_EQINITFAIL; 3053 } 3054 3055 return (rc); 3056 } 3057 3058 int 3059 _rdc_kill_diskq(rdc_u_info_t *urdc) 3060 { 3061 rdc_k_info_t *krdc = &rdc_k_info[urdc->index]; 3062 rdc_group_t *group = krdc->group; 3063 disk_queue *q = &group->diskq; 3064 rdc_u_info_t *up; 3065 rdc_k_info_t *p; 3066 3067 group->flags |= RDC_DISKQ_KILL; 3068 #ifdef DEBUG 3069 cmn_err(CE_NOTE, "!disabling disk queue %s", urdc->disk_queue); 3070 #endif 3071 3072 mutex_enter(QLOCK(q)); 3073 rdc_init_diskq_header(group, &q->disk_hdr); 3074 rdc_dump_iohdrs(q); 3075 3076 /* 3077 * nsc_close the queue and zero out the queue name 3078 */ 3079 rdc_wait_qbusy(q); 3080 rdc_close_diskq(group); 3081 mutex_exit(QLOCK(q)); 3082 SET_QSIZE(q, 0); 3083 rdc_clr_flags(urdc, RDC_DISKQ_FAILED); 3084 bzero(urdc->disk_queue, NSC_MAXPATH); 3085 for (p = krdc->group_next; p != krdc; p = p->group_next) { 3086 up = &rdc_u_info[p->index]; 3087 rdc_clr_flags(up, RDC_DISKQ_FAILED); 3088 bzero(up->disk_queue, NSC_MAXPATH); 3089 } 3090 3091 #ifdef DEBUG 3092 cmn_err(CE_NOTE, "!_rdc_kill_diskq: enabling memory queue"); 3093 #endif 3094 group->flags &= ~(RDC_DISKQUE|RDC_DISKQ_KILL); 3095 group->flags |= RDC_MEMQUE; 3096 return (0); 3097 } 3098 3099 /* 3100 * remove this diskq regardless of whether it is draining or not 3101 * stops the flusher by invalidating the qdata (ie, instant empty) 3102 * remove the disk qeueue from the group, leaving the group with a memory 3103 * queue. 3104 */ 3105 int 3106 rdc_kill_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 3107 { 3108 int rc; 3109 int index; 3110 rdc_u_info_t *urdc; 3111 rdc_k_info_t *krdc; 3112 rdc_set_t *rdc_set = uparms->rdc_set; 3113 3114 mutex_enter(&rdc_conf_lock); 3115 index = rdc_lookup_byname(uparms->rdc_set); 3116 mutex_exit(&rdc_conf_lock); 3117 3118 if (index < 0) { 3119 spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file, 3120 rdc_set->secondary.file); 3121 rc = RDC_EALREADY; 3122 goto failed; 3123 } 3124 3125 urdc = &rdc_u_info[index]; 3126 krdc = &rdc_k_info[index]; 3127 3128 if (!RDC_IS_DISKQ(krdc->group)) { 3129 spcs_s_add(kstatus, RDC_EQNOQUEUE, rdc_set->primary.intf, 3130 rdc_set->primary.file, rdc_set->secondary.intf, 3131 rdc_set->secondary.file); 3132 rc = RDC_EQNOQUEUE; 3133 goto failed; 3134 } 3135 3136 /* 3137 * if (!IS_STATE(urdc, RDC_LOGGING)) { 3138 * spcs_s_add(kstatus, RDC_EQNOTLOGGING, 3139 * uparms->rdc_set->disk_queue); 3140 * rc = RDC_EQNOTLOGGING; 3141 * goto failed; 3142 * } 3143 */ 3144 rdc_unintercept_diskq(krdc->group); /* stop protecting queue */ 3145 rdc_group_enter(krdc); /* to prevent further flushing */ 3146 rc = _rdc_kill_diskq(urdc); 3147 rdc_group_exit(krdc); 3148 3149 failed: 3150 return (rc); 3151 } 3152 3153 /* 3154 * remove a diskq from a group. 3155 * removal of a diskq from a set, or rather 3156 * a set from a queue, is done by reconfigging out 3157 * of the group. This removes the diskq from a whole 3158 * group and replaces it with a memory based queue 3159 */ 3160 #define NUM_RETRIES 15 /* Number of retries to wait if no progress */ 3161 int 3162 rdc_rem_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus) 3163 { 3164 int index; 3165 rdc_u_info_t *urdc; 3166 rdc_k_info_t *krdc; 3167 rdc_k_info_t *this; 3168 volatile rdc_group_t *group; 3169 volatile disk_queue *diskq; 3170 int threads, counter; 3171 long blocks; 3172 3173 mutex_enter(&rdc_conf_lock); 3174 index = rdc_lookup_byname(uparms->rdc_set); 3175 mutex_exit(&rdc_conf_lock); 3176 if (index < 0) { 3177 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file, 3178 uparms->rdc_set->secondary.file); 3179 return (RDC_EALREADY); 3180 } 3181 3182 urdc = &rdc_u_info[index]; 3183 this = &rdc_k_info[index]; 3184 krdc = &rdc_k_info[index]; 3185 3186 do { 3187 if (!IS_STATE(urdc, RDC_LOGGING)) { 3188 spcs_s_add(kstatus, RDC_EQNOTLOGGING, 3189 urdc->disk_queue); 3190 return (RDC_EQNOTLOGGING); 3191 } 3192 krdc = krdc->group_next; 3193 urdc = &rdc_u_info[krdc->index]; 3194 3195 } while (krdc != this); 3196 3197 /* 3198 * If there is no group or diskq configured, we can leave now 3199 */ 3200 if (!(group = krdc->group) || !(diskq = &group->diskq)) 3201 return (0); 3202 3203 3204 /* 3205 * Wait if not QEMPTY or threads still active 3206 */ 3207 counter = 0; 3208 while (!QEMPTY(diskq) || group->rdc_thrnum) { 3209 3210 /* 3211 * Capture counters to determine if progress is being made 3212 */ 3213 blocks = QBLOCKS(diskq); 3214 threads = group->rdc_thrnum; 3215 3216 /* 3217 * Wait 3218 */ 3219 delay(HZ); 3220 3221 /* 3222 * Has the group or disk queue gone away while delayed? 3223 */ 3224 if (!(group = krdc->group) || !(diskq = &group->diskq)) 3225 return (0); 3226 3227 /* 3228 * Are we still seeing progress? 3229 */ 3230 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) { 3231 /* 3232 * No progress see, decrement retry counter 3233 */ 3234 if (counter++ > NUM_RETRIES) { 3235 /* 3236 * No progress seen, increment retry counter 3237 */ 3238 int rc = group->rdc_thrnum ? 3239 RDC_EQFLUSHING : RDC_EQNOTEMPTY; 3240 spcs_s_add(kstatus, rc, urdc->disk_queue); 3241 return (rc); 3242 } 3243 } else { 3244 /* 3245 * Reset counter, as we've made progress 3246 */ 3247 counter = 0; 3248 } 3249 } 3250 3251 return (0); 3252 }