1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* Network data replicator Client side */ 27 28 29 #include <sys/types.h> 30 #include <sys/debug.h> 31 #include <sys/ksynch.h> 32 #include <sys/cmn_err.h> 33 #include <sys/kmem.h> 34 #include <sys/cred.h> 35 #include <sys/byteorder.h> 36 #include <sys/errno.h> 37 38 #ifdef _SunOS_2_6 39 /* 40 * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we 41 * define enum_t here as it is all we need from rpc/types.h 42 * anyway and make it look like we included it. Yuck. 43 */ 44 #define _RPC_TYPES_H 45 typedef int enum_t; 46 #else 47 #ifndef DS_DDICT 48 #include <rpc/types.h> 49 #endif 50 #endif /* _SunOS_2_6 */ 51 52 #ifndef DS_DDICT 53 #include <rpc/auth.h> 54 #include <rpc/svc.h> 55 #include <rpc/xdr.h> 56 #endif 57 #include <sys/ddi.h> 58 59 #include <sys/nsc_thread.h> 60 #ifdef DS_DDICT 61 #include <sys/nsctl/contract.h> 62 #endif 63 #include <sys/nsctl/nsctl.h> 64 65 #include <sys/sdt.h> /* dtrace is S10 or later */ 66 67 #include "rdc_io.h" 68 #include "rdc_clnt.h" 69 #include "rdc_bitmap.h" 70 #include "rdc_diskq.h" 71 72 73 kmutex_t rdc_clnt_lock; 74 75 #ifdef DEBUG 76 int noflush = 0; 77 #endif 78 79 int rdc_rpc_tmout = RDC_CLNT_TMOUT; 80 static void rdc_clnt_free(struct chtab *, CLIENT *); 81 static void _rdc_remote_flush(rdc_aio_t *); 82 83 void rdc_flush_memq(int index); 84 void rdc_flush_diskq(int index); 85 int rdc_drain_net_queue(int index); 86 void rdc_flusher_thread(int index); 87 int rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *); 88 void rdc_init_diskq_header(rdc_group_t *grp, dqheader *hd); 89 void rdc_dump_iohdrs(disk_queue *dq); 90 rdc_aio_t *rdc_dequeue(rdc_k_info_t *krdc, int *rc); 91 void rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_off_t qpos); 92 void rdc_close_diskq(rdc_group_t *krdc); 93 94 int rdc_writer(int index); 95 96 static struct chtab *rdc_chtable = NULL; 97 static int rdc_clnt_toomany; 98 #ifdef DEBUG 99 static int rdc_ooreply; 100 #endif 101 102 extern void rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag); 103 extern int _rdc_rsrv_diskq(rdc_group_t *group); 104 extern void _rdc_rlse_diskq(rdc_group_t *group); 105 106 static enum clnt_stat 107 cl_call_sig(struct __client *rh, rpcproc_t proc, 108 xdrproc_t xargs, caddr_t argsp, xdrproc_t xres, 109 caddr_t resp, struct timeval secs) 110 { 111 enum clnt_stat stat; 112 k_sigset_t smask; 113 sigintr(&smask, 0); 114 rh->cl_nosignal = TRUE; 115 stat = ((*(rh)->cl_ops->cl_call)\ 116 (rh, proc, xargs, argsp, xres, resp, secs)); 117 rh->cl_nosignal = FALSE; 118 sigunintr(&smask); 119 return (stat); 120 } 121 122 int 123 rdc_net_getsize(int index, uint64_t *sizeptr) 124 { 125 struct timeval t; 126 int err, size; 127 rdc_k_info_t *krdc = &rdc_k_info[index]; 128 int remote_index = krdc->remote_index; 129 130 *sizeptr = 0; 131 if (krdc->remote_index < 0) 132 return (EINVAL); 133 134 t.tv_sec = rdc_rpc_tmout; 135 t.tv_usec = 0; 136 137 #ifdef DEBUG 138 if (krdc->intf == NULL) 139 cmn_err(CE_WARN, 140 "!rdc_net_getsize: null intf for index %d", index); 141 #endif 142 if (krdc->rpc_version <= RDC_VERSION5) { 143 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE, 144 krdc->rpc_version, xdr_int, (char *)&remote_index, 145 xdr_int, (char *)&size, &t); 146 if (err == 0) 147 *sizeptr = size; 148 } else { 149 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE6, 150 krdc->rpc_version, xdr_int, (char *)&remote_index, 151 xdr_u_longlong_t, (char *)sizeptr, &t); 152 } 153 return (err); 154 } 155 156 157 int 158 rdc_net_state(int index, int options) 159 { 160 struct timeval t; 161 int err; 162 int remote_index = -1; 163 rdc_u_info_t *urdc = &rdc_u_info[index]; 164 rdc_k_info_t *krdc = &rdc_k_info[index]; 165 struct set_state s; 166 struct set_state4 s4; 167 char neta[32], rneta[32]; 168 unsigned short *sp; 169 170 t.tv_sec = rdc_rpc_tmout; 171 t.tv_usec = 0; 172 173 if (krdc->rpc_version < RDC_VERSION7) { 174 s4.netaddrlen = urdc->primary.addr.len; 175 s4.rnetaddrlen = urdc->secondary.addr.len; 176 bcopy(urdc->primary.addr.buf, s4.netaddr, s4.netaddrlen); 177 bcopy(urdc->secondary.addr.buf, s4.rnetaddr, s4.rnetaddrlen); 178 (void) strncpy(s4.pfile, urdc->primary.file, RDC_MAXNAMLEN); 179 (void) strncpy(s4.sfile, urdc->secondary.file, RDC_MAXNAMLEN); 180 s4.flag = options; 181 182 err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE, 183 krdc->rpc_version, xdr_set_state4, (char *)&s4, xdr_int, 184 (char *)&remote_index, &t); 185 } else { 186 s.netaddrlen = urdc->primary.addr.len; 187 s.rnetaddrlen = urdc->secondary.addr.len; 188 s.netaddr.buf = neta; 189 s.rnetaddr.buf = rneta; 190 bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen); 191 bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen); 192 s.netaddr.len = urdc->primary.addr.len; 193 s.rnetaddr.len = urdc->secondary.addr.len; 194 s.netaddr.maxlen = urdc->primary.addr.len; 195 s.rnetaddr.maxlen = urdc->secondary.addr.len; 196 sp = (unsigned short *)s.netaddr.buf; 197 *sp = htons(*sp); 198 sp = (unsigned short *)s.rnetaddr.buf; 199 *sp = htons(*sp); 200 s.pfile = urdc->primary.file; 201 s.sfile = urdc->secondary.file; 202 s.flag = options; 203 204 err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE, 205 krdc->rpc_version, xdr_set_state, (char *)&s, xdr_int, 206 (char *)&remote_index, &t); 207 } 208 209 if (err) 210 return (-1); 211 else 212 return (remote_index); 213 } 214 215 216 /* 217 * rdc_net_getbmap 218 * gets the bitmaps from remote side and or's them with remote bitmap 219 */ 220 int 221 rdc_net_getbmap(int index, int size) 222 { 223 struct timeval t; 224 int err; 225 struct bmap b; 226 struct bmap6 b6; 227 rdc_k_info_t *krdc; 228 229 krdc = &rdc_k_info[index]; 230 231 if (krdc->remote_index < 0) 232 return (EINVAL); 233 234 t.tv_sec = rdc_rpc_tmout; 235 t.tv_usec = 0; 236 #ifdef DEBUG 237 if (krdc->intf == NULL) 238 cmn_err(CE_WARN, 239 "!rdc_net_getbmap: null intf for index %d", index); 240 #endif 241 242 if (krdc->rpc_version <= RDC_VERSION5) { 243 b.cd = krdc->remote_index; 244 b.dual = index; 245 b.size = size; 246 err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP, 247 krdc->rpc_version, xdr_bmap, (char *)&b, xdr_int, 248 (char *)&err, &t); 249 250 } else { 251 b6.cd = krdc->remote_index; 252 b6.dual = index; 253 b6.size = size; 254 err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP6, 255 krdc->rpc_version, xdr_bmap6, (char *)&b6, xdr_int, 256 (char *)&err, &t); 257 } 258 return (err); 259 } 260 261 int sndr_proto = 0; 262 263 /* 264 * return state corresponding to rdc_host 265 */ 266 int 267 rdc_net_getstate(rdc_k_info_t *krdc, int *serial_mode, int *use_mirror, 268 int *mirror_down, int network) 269 { 270 int err; 271 struct timeval t; 272 int state; 273 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 274 struct set_state s; 275 #ifdef sparc 276 struct set_state4 s4; 277 #endif 278 char neta[32]; 279 char rneta[32]; 280 unsigned short *sp; 281 char *setp = (char *)&s; 282 xdrproc_t xdr_proc = xdr_set_state; 283 284 if (krdc->lsrv && (krdc->intf == NULL || krdc->intf->if_down) && 285 network) /* fail fast */ 286 return (-1); 287 288 s.netaddrlen = urdc->primary.addr.len; 289 s.rnetaddrlen = urdc->secondary.addr.len; 290 s.pfile = urdc->primary.file; 291 s.sfile = urdc->secondary.file; 292 s.netaddr.buf = neta; 293 s.rnetaddr.buf = rneta; 294 bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen); 295 bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen); 296 sp = (unsigned short *) s.netaddr.buf; 297 *sp = htons(*sp); 298 sp = (unsigned short *) s.rnetaddr.buf; 299 *sp = htons(*sp); 300 s.netaddr.len = urdc->primary.addr.len; 301 s.rnetaddr.len = urdc->secondary.addr.len; 302 s.netaddr.maxlen = urdc->primary.addr.maxlen; 303 s.rnetaddr.maxlen = urdc->secondary.addr.maxlen; 304 s.flag = 0; 305 306 t.tv_sec = rdc_rpc_tmout; 307 t.tv_usec = 0; 308 309 if (sndr_proto) 310 krdc->rpc_version = sndr_proto; 311 else 312 krdc->rpc_version = RDC_VERS_MAX; 313 314 again: 315 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSTATE4, krdc->rpc_version, 316 xdr_proc, setp, xdr_int, (char *)&state, &t); 317 318 if (err == RPC_PROGVERSMISMATCH && (krdc->rpc_version != 319 RDC_VERS_MIN)) { 320 if (krdc->rpc_version-- == RDC_VERSION7) { 321 /* set_state struct changed with v7 of protocol */ 322 #ifdef sparc 323 s4.netaddrlen = urdc->primary.addr.len; 324 s4.rnetaddrlen = urdc->secondary.addr.len; 325 bcopy(urdc->primary.addr.buf, s4.netaddr, 326 s4.netaddrlen); 327 bcopy(urdc->secondary.addr.buf, s4.rnetaddr, 328 s4.rnetaddrlen); 329 (void) strncpy(s4.pfile, urdc->primary.file, 330 RDC_MAXNAMLEN); 331 (void) strncpy(s4.sfile, urdc->secondary.file, 332 RDC_MAXNAMLEN); 333 s4.flag = 0; 334 xdr_proc = xdr_set_state4; 335 setp = (char *)&s4; 336 #else 337 /* x64 can not use protocols < 7 */ 338 return (-1); 339 #endif 340 } 341 goto again; 342 } 343 #ifdef DEBUG 344 cmn_err(CE_NOTE, "!sndr get_state: Protocol ver %d", krdc->rpc_version); 345 #endif 346 347 if (err) { 348 return (-1); 349 } 350 351 if (state == -1) 352 return (-1); 353 354 if (serial_mode) 355 *serial_mode = (state >> 2) & 1; 356 if (use_mirror) 357 *use_mirror = (state >> 1) & 1; 358 if (mirror_down) 359 *mirror_down = state & 1; 360 361 return (0); 362 } 363 364 365 static struct xdr_discrim rdres_discrim[2] = { 366 { (int)RDC_OK, xdr_readok }, 367 { __dontcare__, NULL_xdrproc_t } 368 }; 369 370 371 /* 372 * Reply from remote read (client side) 373 */ 374 static bool_t 375 xdr_rdresult(XDR *xdrs, readres *rr) 376 { 377 378 return (xdr_union(xdrs, (enum_t *)&(rr->rr_status), 379 (caddr_t)&(rr->rr_ok), rdres_discrim, xdr_void)); 380 } 381 382 static int 383 rdc_rrstatus_decode(int status) 384 { 385 int ret = 0; 386 387 if (status != RDC_OK) { 388 switch (status) { 389 case RDCERR_NOENT: 390 ret = ENOENT; 391 break; 392 case RDCERR_NOMEM: 393 ret = ENOMEM; 394 break; 395 default: 396 ret = EIO; 397 break; 398 } 399 } 400 401 return (ret); 402 } 403 404 405 int 406 rdc_net_read(int local_index, int remote_index, nsc_buf_t *handle, 407 nsc_off_t fba_pos, nsc_size_t fba_len) 408 { 409 struct rdcrdresult rr; 410 rdc_k_info_t *krdc; 411 rdc_u_info_t *urdc; 412 struct rread list; 413 struct rread6 list6; 414 struct timeval t; 415 uchar_t *sv_addr; 416 nsc_vec_t *vec; 417 int rpc_flag; 418 nsc_size_t sv_len; 419 int err; 420 int ret; 421 nsc_size_t len; 422 nsc_size_t maxfbas; 423 int transflag; 424 425 if (handle == NULL) 426 return (EINVAL); 427 428 if (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len)) { 429 #ifdef DEBUG 430 cmn_err(CE_NOTE, "!rdc_net_read: handle bounds"); 431 #endif 432 return (EINVAL); 433 } 434 435 krdc = &rdc_k_info[local_index]; 436 urdc = &rdc_u_info[local_index]; 437 438 maxfbas = MAX_RDC_FBAS; 439 440 if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) { 441 nsc_buf_t *remote_h = NULL; 442 int reserved = 0; 443 444 ret = nsc_reserve(krdc->remote_fd, NSC_MULTI); 445 if (RDC_SUCCESS(ret)) { 446 reserved = 1; 447 ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len, 448 NSC_RDBUF, &remote_h); 449 } 450 if (RDC_SUCCESS(ret)) { 451 ret = nsc_copy(remote_h, handle, fba_pos, fba_pos, 452 fba_len); 453 if (RDC_SUCCESS(ret)) { 454 (void) nsc_free_buf(remote_h); 455 nsc_release(krdc->remote_fd); 456 return (0); 457 } 458 } 459 rdc_group_enter(krdc); 460 rdc_set_flags(urdc, RDC_FCAL_FAILED); 461 rdc_group_exit(krdc); 462 if (remote_h) 463 (void) nsc_free_buf(remote_h); 464 if (reserved) 465 nsc_release(krdc->remote_fd); 466 } 467 468 t.tv_sec = rdc_rpc_tmout; 469 t.tv_usec = 0; 470 471 if (rdc_get_vflags(urdc) & RDC_VOL_FAILED) 472 rpc_flag = RDC_RREAD_FAIL; 473 else 474 rpc_flag = 0; 475 476 #ifdef DEBUG 477 if (krdc->intf == NULL) 478 cmn_err(CE_WARN, 479 "!rdc_net_read: null intf for index %d", local_index); 480 #endif 481 /* 482 * switch on proto version. 483 */ 484 len = fba_len; /* length (FBAs) still to xfer */ 485 rr.rr_bufsize = 0; /* rpc data buffer length (bytes) */ 486 rr.rr_data = NULL; /* rpc data buffer */ 487 transflag = rpc_flag | RDC_RREAD_START; /* setup rpc */ 488 if (krdc->rpc_version <= RDC_VERSION5) { 489 ASSERT(fba_pos <= INT32_MAX); 490 list.pos = (int)fba_pos; /* fba position of start of chunk */ 491 list.cd = remote_index; /* remote end cd */ 492 /* send setup rpc */ 493 list.flag = transflag; 494 ASSERT(len <= INT32_MAX); 495 list.len = (int)len; /* total fba length */ 496 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, 497 krdc->rpc_version, xdr_rread, (char *)&list, xdr_int, 498 (char *)&ret, &t); 499 500 } else { 501 list6.pos = fba_pos; /* fba position of start of chunk */ 502 list6.cd = remote_index; /* remote end cd */ 503 /* send setup rpc */ 504 list6.flag = transflag; /* setup rpc */ 505 ASSERT(len <= INT32_MAX); 506 list6.len = (int)len; /* total fba length */ 507 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, 508 krdc->rpc_version, xdr_rread6, (char *)&list6, xdr_int, 509 (char *)&ret, &t); 510 } 511 512 if (err) { 513 #ifdef DEBUG 514 cmn_err(CE_NOTE, "!rdc_net_read: setup err %d", err); 515 #endif 516 if (err == RPC_INTR) 517 ret = EINTR; 518 else 519 ret = ENOLINK; 520 521 goto remote_rerror; 522 } 523 524 if (ret == 0) { /* No valid index from r_net_read */ 525 #ifdef DEBUG 526 cmn_err(CE_NOTE, 527 "!rdc_net_read: no valid index from r_net_read"); 528 #endif 529 return (ENOBUFS); 530 } 531 transflag = rpc_flag | RDC_RREAD_DATA; 532 if (krdc->rpc_version <= RDC_VERSION5) { 533 list.idx = ret; /* save idx to return to server */ 534 list.flag = transflag; 535 /* move onto to data xfer rpcs */ 536 } else { 537 list6.idx = ret; /* save idx to return to server */ 538 list6.flag = transflag; 539 } 540 541 /* find starting position in handle */ 542 543 vec = handle->sb_vec; 544 545 fba_pos -= handle->sb_pos; 546 547 for (; fba_pos >= FBA_NUM(vec->sv_len); vec++) 548 fba_pos -= FBA_NUM(vec->sv_len); 549 550 sv_addr = vec->sv_addr + FBA_SIZE(fba_pos); /* data in vector */ 551 sv_len = vec->sv_len - FBA_SIZE(fba_pos); /* bytes in vector */ 552 553 while (len) { 554 nsc_size_t translen; 555 if (len > maxfbas) { 556 translen = maxfbas; 557 } else { 558 translen = len; 559 } 560 561 if (FBA_SIZE(translen) > sv_len) { 562 translen = FBA_NUM(sv_len); 563 } 564 565 len -= translen; 566 if (len == 0) { 567 /* last data xfer rpc - tell server to cleanup */ 568 transflag |= RDC_RREAD_END; 569 } 570 571 if (!rr.rr_data || (nsc_size_t)rr.rr_bufsize != 572 FBA_SIZE(translen)) { 573 if (rr.rr_data) 574 kmem_free(rr.rr_data, rr.rr_bufsize); 575 576 ASSERT(FBA_SIZE(translen) <= INT32_MAX); 577 rr.rr_bufsize = FBA_SIZE(translen); 578 rr.rr_data = kmem_alloc(rr.rr_bufsize, KM_NOSLEEP); 579 } 580 581 if (!rr.rr_data) { 582 /* error */ 583 #ifdef DEBUG 584 cmn_err(CE_NOTE, "!rdc_net_read: kmem_alloc failed"); 585 #endif 586 return (ENOMEM); 587 } 588 589 /* get data from remote end */ 590 591 #ifdef DEBUG 592 if (krdc->intf == NULL) 593 cmn_err(CE_WARN, 594 "!rdc_net_read: null intf for index %d", 595 local_index); 596 #endif 597 if (krdc->io_kstats) { 598 mutex_enter(krdc->io_kstats->ks_lock); 599 kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats)); 600 mutex_exit(krdc->io_kstats->ks_lock); 601 } 602 /*CONSTCOND*/ 603 ASSERT(RDC_MAXDATA <= INT32_MAX); 604 ASSERT(translen <= RDC_MAXDATA); 605 if (krdc->rpc_version <= RDC_VERSION5) { 606 list.len = (int)translen; 607 list.flag = transflag; 608 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, 609 krdc->rpc_version, xdr_rread, (char *)&list, 610 xdr_rdresult, (char *)&rr, &t); 611 } else { 612 list6.len = (int)translen; 613 list6.flag = transflag; 614 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, 615 krdc->rpc_version, xdr_rread6, (char *)&list6, 616 xdr_rdresult, (char *)&rr, &t); 617 } 618 619 if (krdc->io_kstats) { 620 mutex_enter(krdc->io_kstats->ks_lock); 621 kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats)); 622 mutex_exit(krdc->io_kstats->ks_lock); 623 } 624 625 if (err) { 626 #ifdef DEBUG 627 cmn_err(CE_NOTE, "!rdc_net_read: rpc err %d", err); 628 #endif 629 if (err == RPC_INTR) { 630 ret = EINTR; 631 } else { 632 ret = ENOLINK; 633 } 634 635 goto remote_rerror; 636 } 637 638 if (rr.rr_status != RDC_OK) { 639 ret = rdc_rrstatus_decode(rr.rr_status); 640 if (!ret) 641 ret = EIO; 642 643 goto remote_rerror; 644 } 645 646 /* copy into handle */ 647 648 bcopy(rr.rr_data, sv_addr, (size_t)rr.rr_bufsize); 649 650 /* update counters */ 651 652 sv_addr += rr.rr_bufsize; 653 if (krdc->rpc_version <= RDC_VERSION5) { 654 list.pos += translen; 655 } else { 656 list6.pos += translen; 657 } 658 if (krdc->io_kstats) { 659 KSTAT_IO_PTR(krdc->io_kstats)->reads++; 660 KSTAT_IO_PTR(krdc->io_kstats)->nread += rr.rr_bufsize; 661 } 662 ASSERT(sv_len <= INT32_MAX); 663 ASSERT(sv_len >= (nsc_size_t)rr.rr_bufsize); 664 sv_len -= rr.rr_bufsize; 665 666 if (sv_len == 0) { 667 /* goto next vector */ 668 vec++; 669 sv_addr = vec->sv_addr; 670 sv_len = vec->sv_len; 671 } 672 } 673 674 if (rr.rr_data) 675 kmem_free(rr.rr_data, rr.rr_bufsize); 676 677 return (0); 678 679 remote_rerror: 680 if (rr.rr_data) 681 kmem_free(rr.rr_data, rr.rr_bufsize); 682 683 return (ret ? ret : ENOLINK); 684 } 685 686 /* 687 * rdc_net_write 688 * Main remote write client side 689 * Handles protocol selection as well as requests for remote allocation 690 * and data transfer 691 * Does local IO for FCAL 692 * caller must clear bitmap on success 693 */ 694 695 int 696 rdc_net_write(int local_index, int remote_index, nsc_buf_t *handle, 697 nsc_off_t fba_pos, nsc_size_t fba_len, uint_t aseq, int qpos, 698 netwriteres *netres) 699 { 700 rdc_k_info_t *krdc; 701 rdc_u_info_t *urdc; 702 struct timeval t; 703 nsc_vec_t *vec; 704 int sv_len; 705 nsc_off_t fpos; 706 int err; 707 struct netwriteres netret; 708 struct netwriteres *netresptr; 709 struct net_data5 dlist5; 710 struct net_data6 dlist6; 711 int ret; 712 nsc_size_t maxfbas; 713 int transflag; 714 int translen; 715 int transendoblk; 716 char *transptr; 717 int vflags; 718 719 if (handle == NULL) 720 return (EINVAL); 721 722 /* if not a diskq buffer */ 723 if ((qpos == -1) && (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len))) { 724 #ifdef DEBUG 725 cmn_err(CE_NOTE, "!rdc_net_write: handle bounds"); 726 #endif 727 return (EINVAL); 728 } 729 730 731 t.tv_sec = rdc_rpc_tmout; 732 t.tv_usec = 0; 733 734 krdc = &rdc_k_info[local_index]; 735 urdc = &rdc_u_info[local_index]; 736 737 maxfbas = MAX_RDC_FBAS; 738 739 /* FCAL IO */ 740 if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) { 741 nsc_buf_t *remote_h = NULL; 742 int reserved = 0; 743 744 ret = nsc_reserve(krdc->remote_fd, NSC_MULTI); 745 if (RDC_SUCCESS(ret)) { 746 reserved = 1; 747 ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len, 748 NSC_WRBUF, &remote_h); 749 } 750 if (RDC_SUCCESS(ret)) { 751 ret = nsc_copy(handle, remote_h, fba_pos, fba_pos, 752 fba_len); 753 if (RDC_SUCCESS(ret)) 754 ret = nsc_write(remote_h, fba_pos, fba_len, 0); 755 if (RDC_SUCCESS(ret)) { 756 (void) nsc_free_buf(remote_h); 757 nsc_release(krdc->remote_fd); 758 return (0); 759 } 760 } 761 rdc_group_enter(krdc); 762 rdc_set_flags(urdc, RDC_FCAL_FAILED); 763 rdc_group_exit(krdc); 764 if (remote_h) 765 (void) nsc_free_buf(remote_h); 766 if (reserved) 767 nsc_release(krdc->remote_fd); 768 } 769 770 /* 771 * At this point we must decide which protocol we are using and 772 * do the right thing 773 */ 774 netret.vecdata.vecdata_val = NULL; 775 netret.vecdata.vecdata_len = 0; 776 if (netres) { 777 netresptr = netres; 778 } else { 779 netresptr = &netret; 780 } 781 782 vflags = rdc_get_vflags(urdc); 783 784 if (vflags & (RDC_VOL_FAILED|RDC_BMP_FAILED)) 785 transflag = RDC_RWRITE_FAIL; 786 else 787 transflag = 0; 788 789 790 #ifdef DEBUG 791 if (krdc->intf == NULL) 792 cmn_err(CE_WARN, "!rdc_net_write: null intf for index %d", 793 local_index); 794 #endif 795 796 vec = handle->sb_vec; 797 798 /* 799 * find starting position in vector 800 */ 801 if ((qpos == -1) || (handle->sb_user == RDC_NULLBUFREAD)) 802 fpos = fba_pos - handle->sb_pos; 803 else 804 fpos = (qpos + 1) - handle->sb_pos; 805 806 for (; fpos >= FBA_NUM(vec->sv_len); vec++) 807 fpos -= FBA_NUM(vec->sv_len); 808 sv_len = vec->sv_len - FBA_SIZE(fpos); /* bytes in vector */ 809 transptr = (char *)vec->sv_addr + FBA_SIZE(fpos); 810 811 if (krdc->rpc_version <= RDC_VERSION5) { 812 dlist5.local_cd = local_index; 813 dlist5.cd = remote_index; 814 ASSERT(fba_len <= INT32_MAX); 815 ASSERT(fba_pos <= INT32_MAX); 816 dlist5.len = (int)fba_len; 817 dlist5.pos = (int)fba_pos; 818 dlist5.idx = -1; /* Starting index */ 819 dlist5.flag = transflag; 820 dlist5.seq = aseq; /* sequence number */ 821 dlist5.sfba = (int)fba_pos; /* starting fba for this xfer */ 822 } else { 823 dlist6.local_cd = local_index; 824 dlist6.cd = remote_index; 825 ASSERT(fba_len <= INT32_MAX); 826 dlist6.len = (int)fba_len; 827 dlist6.qpos = qpos; 828 dlist6.pos = fba_pos; 829 dlist6.idx = -1; /* Starting index */ 830 dlist6.flag = transflag; 831 dlist6.seq = aseq; /* sequence number */ 832 dlist6.sfba = fba_pos; /* starting fba for this xfer */ 833 } 834 835 transendoblk = 0; 836 while (fba_len) { 837 if (!transptr) { 838 #ifdef DEBUG 839 cmn_err(CE_WARN, 840 "!rdc_net_write: walked off end of handle!"); 841 #endif 842 ret = EINVAL; 843 goto remote_error; 844 } 845 846 if (fba_len > maxfbas) { 847 ASSERT(maxfbas <= INT32_MAX); 848 translen = (int)maxfbas; 849 } else { 850 ASSERT(fba_len <= INT32_MAX); 851 translen = (int)fba_len; 852 } 853 854 if (FBA_SIZE(translen) > sv_len) { 855 translen = FBA_NUM(sv_len); 856 } 857 858 fba_len -= translen; 859 if (fba_len == 0) { 860 /* last data xfer - tell server to commit */ 861 transendoblk = 1; 862 } 863 864 865 #ifdef DEBUG 866 if (krdc->intf == NULL) 867 cmn_err(CE_WARN, 868 "!rdc_net_write: null intf for index %d", 869 local_index); 870 #endif 871 DTRACE_PROBE(rdc_netwrite_clntcall_start); 872 873 if (krdc->io_kstats) { 874 mutex_enter(krdc->io_kstats->ks_lock); 875 kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats)); 876 mutex_exit(krdc->io_kstats->ks_lock); 877 } 878 if (krdc->rpc_version <= RDC_VERSION5) { 879 ret = 0; 880 dlist5.nfba = translen; 881 dlist5.endoblk = transendoblk; 882 dlist5.data.data_len = FBA_SIZE(translen); 883 dlist5.data.data_val = transptr; 884 err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE5, 885 krdc->rpc_version, xdr_net_data5, 886 (char *)&dlist5, xdr_int, 887 (char *)&ret, &t); 888 if (ret >= 0) { 889 netresptr->result = 0; 890 netresptr->index = ret; 891 } else { 892 netresptr->result = ret; 893 } 894 } else { 895 netresptr->result = 0; 896 dlist6.nfba = translen; 897 dlist6.endoblk = transendoblk; 898 dlist6.data.data_len = FBA_SIZE(translen); 899 dlist6.data.data_val = transptr; 900 err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, 901 krdc->rpc_version, xdr_net_data6, 902 (char *)&dlist6, xdr_netwriteres, 903 (char *)netresptr, &t); 904 } 905 906 if (krdc->io_kstats) { 907 mutex_enter(krdc->io_kstats->ks_lock); 908 kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats)); 909 mutex_exit(krdc->io_kstats->ks_lock); 910 } 911 912 DTRACE_PROBE(rdc_netwrite_clntcall_end); 913 ret = netresptr->result; 914 if (err) { 915 if (err == RPC_INTR) 916 ret = EINTR; 917 else if (err && ret != EPROTO) 918 ret = ENOLINK; 919 #ifdef DEBUG 920 cmn_err(CE_NOTE, 921 "!rdc_net_write(5): cd %d err %d ret %d", 922 remote_index, err, ret); 923 #endif 924 goto remote_error; 925 } 926 /* Error from r_net_write5 */ 927 if (netresptr->result < 0) { 928 #ifdef DEBUG 929 cmn_err(CE_NOTE, 930 "!rdc_net_write: r_net_write(5) " 931 "returned: %d", 932 -netresptr->result); 933 #endif 934 ret = -netresptr->result; 935 if (netret.vecdata.vecdata_val) 936 kmem_free(netret.vecdata.vecdata_val, 937 netret.vecdata.vecdata_len * 938 sizeof (net_pendvec_t)); 939 goto remote_error; 940 } else if (netresptr->index == 0) { 941 #ifdef DEBUG 942 cmn_err(CE_NOTE, 943 "!rdc_net_write: no valid index from " 944 "r_net_write(5)"); 945 #endif 946 ret = ENOBUFS; 947 if (netret.vecdata.vecdata_val) 948 kmem_free(netret.vecdata.vecdata_val, 949 netret.vecdata.vecdata_len * 950 sizeof (net_pendvec_t)); 951 goto remote_error; 952 } 953 if (krdc->rpc_version <= RDC_VERSION5) { 954 dlist5.idx = netresptr->index; 955 dlist5.sfba += dlist5.nfba; 956 } else { 957 dlist6.idx = netresptr->index; 958 dlist6.sfba += dlist6.nfba; 959 } 960 /* update counters */ 961 if (krdc->io_kstats) { 962 KSTAT_IO_PTR(krdc->io_kstats)->writes++; 963 KSTAT_IO_PTR(krdc->io_kstats)->nwritten += 964 FBA_SIZE(translen); 965 } 966 transptr += FBA_SIZE(translen); 967 sv_len -= FBA_SIZE(translen); 968 969 if (sv_len <= 0) { 970 /* goto next vector */ 971 vec++; 972 transptr = (char *)vec->sv_addr; 973 sv_len = vec->sv_len; 974 } 975 } 976 /* 977 * this can't happen..... 978 */ 979 if (netret.vecdata.vecdata_val) 980 kmem_free(netret.vecdata.vecdata_val, 981 netret.vecdata.vecdata_len * 982 sizeof (net_pendvec_t)); 983 984 return (0); 985 986 remote_error: 987 return (ret ? ret : ENOLINK); 988 } 989 990 void 991 rdc_fixlen(rdc_aio_t *aio) 992 { 993 nsc_vec_t *vecp = aio->qhandle->sb_vec; 994 nsc_size_t len = 0; 995 996 while (vecp->sv_addr) { 997 len += FBA_NUM(vecp->sv_len); 998 vecp++; 999 } 1000 aio->qhandle->sb_len = len; 1001 } 1002 1003 /* 1004 * rdc_dump_alloc_bufs_cd 1005 * Dump allocated buffers (rdc_net_hnd's) for the specified cd. 1006 * this could be the flusher failing, if so, don't do the delay forever 1007 * Returns: 0 (success), EAGAIN (caller needs to try again). 1008 */ 1009 int 1010 rdc_dump_alloc_bufs_cd(int index) 1011 { 1012 rdc_k_info_t *krdc; 1013 rdc_aio_t *aio; 1014 net_queue *q; 1015 disk_queue *dq; 1016 kmutex_t *qlock; 1017 1018 krdc = &rdc_k_info[index]; 1019 1020 1021 if (!krdc->c_fd) { 1022 /* cannot do anything! */ 1023 #ifdef DEBUG 1024 cmn_err(CE_WARN, "!rdc_dump_alloc_bufs_cd(%d): c_fd NULL", 1025 index); 1026 #endif 1027 return (0); 1028 } 1029 rdc_dump_dsets(index); 1030 1031 dq = &krdc->group->diskq; 1032 1033 if (RDC_IS_DISKQ(krdc->group)) { 1034 qlock = QLOCK(dq); 1035 (void) _rdc_rsrv_diskq(krdc->group); 1036 } else { 1037 qlock = &krdc->group->ra_queue.net_qlock; 1038 } 1039 1040 /* 1041 * Now dump the async queue anonymous buffers 1042 * if we are a diskq, the we are using the diskq mutex. 1043 * However, we are flushing from diskq to memory queue 1044 * so we now need to grab the memory lock also 1045 */ 1046 1047 q = &krdc->group->ra_queue; 1048 1049 if (RDC_IS_DISKQ(krdc->group)) { 1050 mutex_enter(&q->net_qlock); 1051 if (q->qfill_sleeping == RDC_QFILL_AWAKE) { 1052 int tries = 5; 1053 #ifdef DEBUG_DISKQ 1054 cmn_err(CE_NOTE, 1055 "!dumpalloccd sending diskq->memq flush to sleep"); 1056 #endif 1057 q->qfflags |= RDC_QFILLSLEEP; 1058 mutex_exit(&q->net_qlock); 1059 1060 while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--) 1061 delay(5); 1062 mutex_enter(&q->net_qlock); 1063 } 1064 } 1065 1066 mutex_enter(qlock); 1067 1068 while ((q->net_qhead != NULL)) { 1069 rdc_k_info_t *tmpkrdc; 1070 aio = q->net_qhead; 1071 tmpkrdc = &rdc_k_info[aio->index]; 1072 1073 if (RDC_IS_DISKQ(krdc->group)) { 1074 aio->qhandle->sb_user--; 1075 if (aio->qhandle->sb_user == 0) { 1076 rdc_fixlen(aio); 1077 (void) nsc_free_buf(aio->qhandle); 1078 aio->qhandle = NULL; 1079 aio->handle = NULL; 1080 } 1081 } else { 1082 if (aio->handle) { 1083 (void) nsc_free_buf(aio->handle); 1084 aio->handle = NULL; 1085 } 1086 } 1087 1088 if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(krdc->group)) { 1089 mutex_enter(tmpkrdc->io_kstats->ks_lock); 1090 kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats)); 1091 mutex_exit(tmpkrdc->io_kstats->ks_lock); 1092 } 1093 q->net_qhead = q->net_qhead->next; 1094 q->blocks -= aio->len; 1095 q->nitems--; 1096 1097 RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len); 1098 1099 kmem_free(aio, sizeof (*aio)); 1100 } 1101 q->net_qtail = NULL; 1102 1103 if (krdc->group->asyncstall) { 1104 krdc->group->asyncdis = 1; 1105 cv_broadcast(&krdc->group->asyncqcv); 1106 } 1107 if (krdc->group->sleepq) { 1108 rdc_sleepqdiscard(krdc->group); 1109 } 1110 1111 krdc->group->seq = RDC_NEWSEQ; 1112 krdc->group->seqack = RDC_NEWSEQ; 1113 if (RDC_IS_DISKQ(krdc->group)) { 1114 rdc_dump_iohdrs(dq); 1115 SET_QNXTIO(dq, QHEAD(dq)); 1116 SET_QCOALBOUNDS(dq, QHEAD(dq)); 1117 } 1118 mutex_exit(qlock); 1119 1120 if (RDC_IS_DISKQ(krdc->group)) { 1121 mutex_exit(&q->net_qlock); 1122 _rdc_rlse_diskq(krdc->group); 1123 } 1124 1125 return (0); 1126 } 1127 1128 1129 /* 1130 * rdc_dump_alloc_bufs 1131 * We have an error on the link 1132 * Try to dump all of the allocated bufs so we can cleanly recover 1133 * and not hang 1134 */ 1135 void 1136 rdc_dump_alloc_bufs(rdc_if_t *ip) 1137 { 1138 rdc_k_info_t *krdc; 1139 int repeat; 1140 int index; 1141 1142 for (index = 0; index < rdc_max_sets; index++) { 1143 do { 1144 krdc = &rdc_k_info[index]; 1145 repeat = 0; 1146 if (krdc->intf == ip) { 1147 if (rdc_dump_alloc_bufs_cd(index) == EAGAIN) { 1148 repeat = 1; 1149 delay(2); 1150 } 1151 } 1152 } while (repeat); 1153 } 1154 } 1155 1156 /* 1157 * returns 1 if the the throttle should throttle, 0 if not. 1158 */ 1159 int 1160 _rdc_diskq_isfull(disk_queue *q, long len) 1161 { 1162 /* ---T----H----N--- */ 1163 mutex_enter(QLOCK(q)); 1164 1165 if (FITSONQ(q, len + 1)) { 1166 mutex_exit(QLOCK(q)); 1167 return (0); 1168 } 1169 mutex_exit(QLOCK(q)); 1170 return (1); 1171 } 1172 1173 void 1174 _rdc_async_throttle(rdc_k_info_t *this, long len) 1175 { 1176 rdc_k_info_t *krdc; 1177 rdc_u_info_t *urdc; 1178 int print_msg = 1; 1179 int tries = RDC_FUTILE_ATTEMPTS; 1180 1181 /* 1182 * Throttle entries on queue 1183 */ 1184 1185 /* Need to take the 1-many case into account, checking all sets */ 1186 1187 /* ADD HANDY HUERISTIC HERE TO SLOW DOWN IO */ 1188 for (krdc = this; /* CSTYLED */; krdc = krdc->many_next) { 1189 urdc = &rdc_u_info[krdc->index]; 1190 1191 /* 1192 * this may be the last set standing in a one to many setup. 1193 * we may also be stuck in unintercept, after marking 1194 * the volume as not enabled, but have not removed it 1195 * from the many list resulting in an endless loop if 1196 * we just continue here. Lets jump over this stuff 1197 * and check to see if we are the only dude here. 1198 */ 1199 if (!IS_ENABLED(urdc)) 1200 goto thischeck; 1201 1202 if (IS_ASYNC(urdc) && RDC_IS_MEMQ(krdc->group)) { 1203 net_queue *q = &krdc->group->ra_queue; 1204 while ((q->blocks + q->inflbls) > urdc->maxqfbas || 1205 (q->nitems + q->inflitems) > urdc->maxqitems) { 1206 1207 if (!IS_ENABLED(urdc)) /* disable race */ 1208 goto thischeck; 1209 1210 if (!krdc->group->rdc_writer) 1211 (void) rdc_writer(krdc->index); 1212 delay(2); 1213 q->throttle_delay++; 1214 } 1215 } 1216 1217 /* do a much more aggressive delay, get disk flush going */ 1218 if (IS_ASYNC(urdc) && RDC_IS_DISKQ(krdc->group)) { 1219 disk_queue *q = &krdc->group->diskq; 1220 while ((!IS_QSTATE(q, RDC_QNOBLOCK)) && 1221 (_rdc_diskq_isfull(q, len)) && 1222 (!IS_STATE(urdc, RDC_DISKQ_FAILED))) { 1223 if (print_msg) { 1224 cmn_err(CE_WARN, "!rdc async throttle:" 1225 " disk queue %s full", 1226 &urdc->disk_queue[0]); 1227 1228 print_msg = 0; 1229 } 1230 if (!IS_ENABLED(urdc)) /* disable race */ 1231 goto thischeck; 1232 1233 if (!krdc->group->rdc_writer) 1234 (void) rdc_writer(krdc->index); 1235 delay(10); 1236 q->throttle_delay += 10; 1237 1238 if (!(tries--) && IS_STATE(urdc, RDC_QUEUING)) { 1239 cmn_err(CE_WARN, "!SNDR: disk queue " 1240 "%s full & not flushing. giving up", 1241 &urdc->disk_queue[0]); 1242 cmn_err(CE_WARN, "!SNDR: %s:%s entering" 1243 " logging mode", 1244 urdc->secondary.intf, 1245 urdc->secondary.file); 1246 rdc_fail_diskq(krdc, RDC_WAIT, 1247 RDC_DOLOG | RDC_NOFAIL); 1248 mutex_enter(QLOCK(q)); 1249 cv_broadcast(&q->qfullcv); 1250 mutex_exit(QLOCK(q)); 1251 } 1252 1253 } 1254 if ((IS_QSTATE(q, RDC_QNOBLOCK)) && 1255 _rdc_diskq_isfull(q, len) && 1256 !IS_STATE(urdc, RDC_DISKQ_FAILED)) { 1257 if (print_msg) { 1258 cmn_err(CE_WARN, "!disk queue %s full", 1259 &urdc->disk_queue[0]); 1260 print_msg = 0; 1261 } 1262 rdc_fail_diskq(krdc, RDC_WAIT, 1263 RDC_DOLOG | RDC_NOFAIL); 1264 mutex_enter(QLOCK(q)); 1265 cv_broadcast(&q->qfullcv); 1266 mutex_exit(QLOCK(q)); 1267 } 1268 } 1269 1270 thischeck: 1271 if (krdc->many_next == this) 1272 break; 1273 } 1274 } 1275 1276 int rdc_coalesce = 1; 1277 static int rdc_joins = 0; 1278 1279 int 1280 rdc_aio_coalesce(rdc_aio_t *queued, rdc_aio_t *new) 1281 { 1282 nsc_buf_t *h = NULL; 1283 int rc; 1284 rdc_k_info_t *krdc; 1285 uint_t bitmask; 1286 1287 if (rdc_coalesce == 0) 1288 return (0); /* don't even try */ 1289 1290 if ((queued == NULL) || 1291 (queued->handle == NULL) || 1292 (new->handle == NULL)) { 1293 return (0); /* existing queue is empty */ 1294 } 1295 if (queued->index != new->index || queued->len + new->len > 1296 MAX_RDC_FBAS) { 1297 return (0); /* I/O to big */ 1298 } 1299 if ((queued->pos + queued->len == new->pos) || 1300 (new->pos + new->len == queued->pos)) { 1301 rc = nsc_alloc_abuf(queued->pos, queued->len + new->len, 0, 1302 &h); 1303 if (!RDC_SUCCESS(rc)) { 1304 if (h != NULL) 1305 (void) nsc_free_buf(h); 1306 return (0); /* couldn't do coalesce */ 1307 } 1308 rc = nsc_copy(queued->handle, h, queued->pos, queued->pos, 1309 queued->len); 1310 if (!RDC_SUCCESS(rc)) { 1311 (void) nsc_free_buf(h); 1312 return (0); /* couldn't do coalesce */ 1313 } 1314 rc = nsc_copy(new->handle, h, new->pos, new->pos, 1315 new->len); 1316 if (!RDC_SUCCESS(rc)) { 1317 (void) nsc_free_buf(h); 1318 return (0); /* couldn't do coalesce */ 1319 } 1320 1321 krdc = &rdc_k_info[queued->index]; 1322 1323 RDC_SET_BITMASK(queued->pos, queued->len, &bitmask); 1324 RDC_CLR_BITMAP(krdc, queued->pos, queued->len, \ 1325 bitmask, RDC_BIT_BUMP); 1326 1327 RDC_SET_BITMASK(new->pos, new->len, &bitmask); 1328 RDC_CLR_BITMAP(krdc, new->pos, new->len, \ 1329 bitmask, RDC_BIT_BUMP); 1330 1331 (void) nsc_free_buf(queued->handle); 1332 (void) nsc_free_buf(new->handle); 1333 queued->handle = h; 1334 queued->len += new->len; 1335 bitmask = 0; 1336 /* 1337 * bump the ref count back up 1338 */ 1339 1340 RDC_SET_BITMAP(krdc, queued->pos, queued->len, &bitmask); 1341 return (1); /* new I/O succeeds last I/O queued */ 1342 } 1343 return (0); 1344 } 1345 1346 int 1347 rdc_memq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio) 1348 { 1349 net_queue *q; 1350 rdc_group_t *group; 1351 1352 group = krdc->group; 1353 q = &group->ra_queue; 1354 1355 mutex_enter(&q->net_qlock); 1356 1357 if (rdc_aio_coalesce(q->net_qtail, aio)) { 1358 rdc_joins++; 1359 q->blocks += aio->len; 1360 kmem_free(aio, sizeof (*aio)); 1361 goto out; 1362 } 1363 aio->seq = group->seq++; 1364 if (group->seq < aio->seq) 1365 group->seq = RDC_NEWSEQ + 1; /* skip magics */ 1366 1367 if (q->net_qhead == NULL) { 1368 /* adding to empty q */ 1369 q->net_qhead = q->net_qtail = aio; 1370 1371 #ifdef DEBUG 1372 if (q->blocks != 0 || q->nitems != 0) { 1373 cmn_err(CE_PANIC, 1374 "rdc enqueue: q %p, qhead 0, q blocks %" NSC_SZFMT 1375 ", nitems %" NSC_SZFMT, 1376 (void *) q, q->blocks, q->nitems); 1377 } 1378 #endif 1379 1380 } else { 1381 /* discontiguous, add aio to q tail */ 1382 q->net_qtail->next = aio; 1383 q->net_qtail = aio; 1384 } 1385 1386 q->blocks += aio->len; 1387 q->nitems++; 1388 1389 if (krdc->io_kstats) { 1390 mutex_enter(krdc->io_kstats->ks_lock); 1391 kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats)); 1392 mutex_exit(krdc->io_kstats->ks_lock); 1393 } 1394 out: 1395 #ifdef DEBUG 1396 /* sum the q and check for sanity */ 1397 { 1398 nsc_size_t qblocks = 0; 1399 uint64_t nitems = 0; 1400 rdc_aio_t *a; 1401 1402 for (a = q->net_qhead; a != NULL; a = a->next) { 1403 qblocks += a->len; 1404 nitems++; 1405 } 1406 1407 if (qblocks != q->blocks || nitems != q->nitems) { 1408 cmn_err(CE_PANIC, 1409 "rdc enqueue: q %p, q blocks %" NSC_SZFMT " (%" 1410 NSC_SZFMT "), nitems %" NSC_SZFMT " (%" 1411 NSC_SZFMT ")", (void *) q, q->blocks, qblocks, 1412 q->nitems, nitems); 1413 } 1414 } 1415 #endif 1416 1417 mutex_exit(&q->net_qlock); 1418 1419 if (q->nitems > q->nitems_hwm) { 1420 q->nitems_hwm = q->nitems; 1421 } 1422 1423 if (q->blocks > q->blocks_hwm) { 1424 q->blocks_hwm = q->blocks; 1425 } 1426 1427 if (!krdc->group->rdc_writer) 1428 (void) rdc_writer(krdc->index); 1429 1430 return (0); 1431 } 1432 1433 int 1434 _rdc_enqueue_write(rdc_k_info_t *krdc, nsc_off_t pos, nsc_size_t len, int flag, 1435 nsc_buf_t *h) 1436 { 1437 rdc_aio_t *aio; 1438 rdc_group_t *group; 1439 rdc_u_info_t *urdc = &rdc_u_info[krdc->index]; 1440 int rc; 1441 1442 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP); 1443 if (!aio) { 1444 return (ENOMEM); 1445 } 1446 1447 group = krdc->group; 1448 1449 aio->pos = pos; 1450 aio->qpos = -1; 1451 aio->len = len; 1452 aio->flag = flag; 1453 aio->index = krdc->index; 1454 aio->handle = h; 1455 1456 if (group->flags & RDC_MEMQUE) { 1457 return (rdc_memq_enqueue(krdc, aio)); 1458 } else if ((group->flags & RDC_DISKQUE) && 1459 !IS_STATE(urdc, RDC_DISKQ_FAILED)) { 1460 rc = rdc_diskq_enqueue(krdc, aio); 1461 kmem_free(aio, sizeof (*aio)); 1462 return (rc); 1463 } 1464 return (-1); /* keep lint quiet */ 1465 } 1466 1467 1468 1469 1470 /* 1471 * Async Network RDC flusher 1472 */ 1473 1474 /* 1475 * don't allow any new writer threads to start if a member of the set 1476 * is disable pending 1477 */ 1478 int 1479 is_disable_pending(rdc_k_info_t *krdc) 1480 { 1481 rdc_k_info_t *this = krdc; 1482 int rc = 0; 1483 1484 do { 1485 if (krdc->type_flag & RDC_DISABLEPEND) { 1486 krdc = this; 1487 rc = 1; 1488 break; 1489 } 1490 krdc = krdc->group_next; 1491 1492 } while (krdc != this); 1493 1494 return (rc); 1495 } 1496 1497 /* 1498 * rdc_writer -- spawn new writer if not running already 1499 * called after enqueing the dirty blocks 1500 */ 1501 int 1502 rdc_writer(int index) 1503 { 1504 rdc_k_info_t *krdc = &rdc_k_info[index]; 1505 nsthread_t *t; 1506 rdc_group_t *group; 1507 kmutex_t *qlock; 1508 int tries; 1509 const int MAX_TRIES = 16; 1510 1511 group = krdc->group; 1512 1513 if (RDC_IS_DISKQ(group)) 1514 qlock = &group->diskq.disk_qlock; 1515 else 1516 qlock = &group->ra_queue.net_qlock; 1517 1518 mutex_enter(qlock); 1519 1520 #ifdef DEBUG 1521 if (noflush) { 1522 mutex_exit(qlock); 1523 return (0); 1524 } 1525 #endif 1526 1527 if ((group->rdc_writer) || is_disable_pending(krdc)) { 1528 mutex_exit(qlock); 1529 return (0); 1530 } 1531 1532 if ((group->rdc_thrnum >= 1) && (group->seqack == RDC_NEWSEQ)) { 1533 /* 1534 * We also need to check if we are starting a new 1535 * sequence, and if so don't create a new thread, 1536 * as we must ensure that the start of new sequence 1537 * requests arrives first to re-init the server. 1538 */ 1539 mutex_exit(qlock); 1540 return (0); 1541 } 1542 /* 1543 * For version 6, 1544 * see if we can fit in another thread. 1545 */ 1546 group->rdc_thrnum++; 1547 1548 if (krdc->intf && (krdc->intf->rpc_version >= RDC_VERSION6)) { 1549 rdc_u_info_t *urdc = &rdc_u_info[index]; 1550 if (group->rdc_thrnum >= urdc->asyncthr) 1551 group->rdc_writer = 1; 1552 } else { 1553 group->rdc_writer = 1; 1554 } 1555 1556 mutex_exit(qlock); 1557 1558 1559 /* 1560 * If we got here, we know that we have not exceeded the allowed 1561 * number of async threads for our group. If we run out of threads 1562 * in _rdc_flset, we add a new thread to the set. 1563 */ 1564 tries = 0; 1565 do { 1566 /* first try to grab a thread from the free list */ 1567 if (t = nst_create(_rdc_flset, rdc_flusher_thread, 1568 (blind_t)(unsigned long)index, 0)) { 1569 break; 1570 } 1571 1572 /* that failed; add a thread to the set and try again */ 1573 if (nst_add_thread(_rdc_flset, 1) != 1) { 1574 cmn_err(CE_WARN, "!rdc_writer index %d nst_add_thread " 1575 "error, tries: %d", index, tries); 1576 break; 1577 } 1578 } while (++tries < MAX_TRIES); 1579 1580 if (tries) { 1581 mutex_enter(&group->addthrnumlk); 1582 group->rdc_addthrnum += tries; 1583 mutex_exit(&group->addthrnumlk); 1584 } 1585 1586 if (t) { 1587 return (1); 1588 } 1589 1590 cmn_err(CE_WARN, "!rdc_writer: index %d nst_create error", index); 1591 rdc_many_enter(krdc); 1592 mutex_enter(qlock); 1593 group->rdc_thrnum--; 1594 group->rdc_writer = 0; 1595 if ((group->count == 0) && (group->rdc_thrnum == 0)) { 1596 mutex_exit(qlock); 1597 /* 1598 * Race with remove_from_group while write thread was 1599 * failing to be created. 1600 */ 1601 #ifdef DEBUG 1602 cmn_err(CE_WARN, "!rdc_writer: group being destroyed"); 1603 #endif 1604 rdc_delgroup(group); 1605 krdc->group = NULL; 1606 rdc_many_exit(krdc); 1607 return (-1); 1608 } 1609 mutex_exit(qlock); 1610 rdc_many_exit(krdc); 1611 return (-1); 1612 } 1613 1614 /* 1615 * Either we need to flush the 1616 * kmem (net_queue) queue or the disk (disk_queue) 1617 * determine which, and do it. 1618 */ 1619 void 1620 rdc_flusher_thread(int index) 1621 { 1622 rdc_k_info_t *krdc = &rdc_k_info[index]; 1623 1624 if (krdc->group->flags & RDC_MEMQUE) { 1625 rdc_flush_memq(index); 1626 return; 1627 } else if (krdc->group->flags & RDC_DISKQUE) { 1628 rdc_flush_diskq(index); 1629 return; 1630 } else { /* uh-oh, big time */ 1631 cmn_err(CE_PANIC, "flusher trying to flush unknown queue type"); 1632 } 1633 1634 } 1635 1636 void 1637 rdc_flush_memq(int index) 1638 { 1639 rdc_k_info_t *krdc = &rdc_k_info[index]; 1640 rdc_aio_t *aio; 1641 net_queue *q; 1642 int dowork; 1643 rdc_group_t *group = krdc->group; 1644 if (!group || group->count == 0) { 1645 #ifdef DEBUG 1646 cmn_err(CE_WARN, "!rdc_flush_memq: no group left!"); 1647 #endif 1648 return; 1649 } 1650 1651 if (!krdc->c_fd) { 1652 #ifdef DEBUG 1653 cmn_err(CE_WARN, "!rdc_flush_memq: no c_fd!"); 1654 #endif 1655 goto thread_death; 1656 } 1657 1658 #ifdef DEBUG_DISABLE 1659 if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) { 1660 cmn_err(CE_WARN, "!rdc_flush_memq: DISABLE PENDING!"); 1661 /* 1662 * Need to continue as we may be trying to flush IO 1663 * while trying to disable or suspend 1664 */ 1665 } 1666 #endif 1667 1668 q = &group->ra_queue; 1669 1670 dowork = 1; 1671 /* CONSTCOND */ 1672 while (dowork) { 1673 if (net_exit == ATM_EXIT) 1674 break; 1675 1676 group = krdc->group; 1677 if (!group || group->count == 0) { 1678 #ifdef DEBUG 1679 cmn_err(CE_WARN, "!rdc_flush_memq: no group left!"); 1680 #endif 1681 break; 1682 } 1683 1684 mutex_enter(&q->net_qlock); 1685 aio = q->net_qhead; 1686 1687 if (aio == NULL) { 1688 #ifdef DEBUG 1689 if (q->nitems != 0 || 1690 q->blocks != 0 || 1691 q->net_qtail != 0) { 1692 cmn_err(CE_PANIC, 1693 "rdc_flush_memq(1): q %p, q blocks %" 1694 NSC_SZFMT ", nitems %" NSC_SZFMT 1695 ", qhead %p qtail %p", 1696 (void *) q, q->blocks, q->nitems, 1697 (void *) aio, (void *) q->net_qtail); 1698 } 1699 #endif 1700 mutex_exit(&q->net_qlock); 1701 break; 1702 } 1703 1704 /* aio remove from q */ 1705 1706 q->net_qhead = aio->next; 1707 aio->next = NULL; 1708 1709 if (q->net_qtail == aio) 1710 q->net_qtail = q->net_qhead; 1711 1712 q->blocks -= aio->len; 1713 q->nitems--; 1714 1715 /* 1716 * in flight numbers. 1717 */ 1718 q->inflbls += aio->len; 1719 q->inflitems++; 1720 1721 #ifdef DEBUG 1722 if (q->net_qhead == NULL) { 1723 if (q->nitems != 0 || 1724 q->blocks != 0 || 1725 q->net_qtail != 0) { 1726 cmn_err(CE_PANIC, 1727 "rdc_flush_memq(2): q %p, q blocks %" 1728 NSC_SZFMT ", nitems %" NSC_SZFMT 1729 ", qhead %p qtail %p", 1730 (void *) q, q->blocks, q->nitems, 1731 (void *) q->net_qhead, 1732 (void *) q->net_qtail); 1733 } 1734 } 1735 1736 #ifndef NSC_MULTI_TERABYTE 1737 if (q->blocks < 0) { 1738 cmn_err(CE_PANIC, 1739 "rdc_flush_memq(3): q %p, q blocks %" NSC_SZFMT 1740 ", nitems %d, qhead %p, qtail %p", 1741 (void *) q, q->blocks, q->nitems, 1742 (void *) q->net_qhead, (void *) q->net_qtail); 1743 } 1744 #else 1745 /* blocks and nitems are unsigned for NSC_MULTI_TERABYTE */ 1746 #endif 1747 #endif 1748 1749 mutex_exit(&q->net_qlock); 1750 1751 aio->iostatus = RDC_IO_INIT; 1752 1753 _rdc_remote_flush(aio); 1754 1755 mutex_enter(&q->net_qlock); 1756 q->inflbls -= aio->len; 1757 q->inflitems--; 1758 if ((group->seqack == RDC_NEWSEQ) && 1759 (group->seq != RDC_NEWSEQ + 1)) { 1760 if ((q->net_qhead == NULL) || 1761 (q->net_qhead->seq != RDC_NEWSEQ + 1)) { 1762 /* 1763 * We are an old thread, and the 1764 * queue sequence has been reset 1765 * during the network write above. 1766 * As such we mustn't pull another 1767 * job from the queue until the 1768 * first sequence message has been ack'ed. 1769 * Just die instead. Unless this thread 1770 * is the first sequence that has just 1771 * been ack'ed 1772 */ 1773 dowork = 0; 1774 } 1775 } 1776 mutex_exit(&q->net_qlock); 1777 1778 if ((aio->iostatus != RDC_IO_DONE) && (group->count)) { 1779 rdc_k_info_t *krdctmp = &rdc_k_info[aio->index]; 1780 if (krdctmp->type_flag & RDC_DISABLEPEND) { 1781 kmem_free(aio, sizeof (*aio)); 1782 goto thread_death; 1783 } 1784 rdc_group_enter(krdc); 1785 ASSERT(krdc->group); 1786 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE, 1787 "memq flush aio status not RDC_IO_DONE"); 1788 rdc_group_exit(krdc); 1789 rdc_dump_queue(aio->index); 1790 } 1791 kmem_free(aio, sizeof (*aio)); 1792 1793 if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf) 1794 break; 1795 } 1796 1797 thread_death: 1798 rdc_many_enter(krdc); 1799 mutex_enter(&group->ra_queue.net_qlock); 1800 group->rdc_thrnum--; 1801 group->rdc_writer = 0; 1802 /* 1803 * all threads must be dead. 1804 */ 1805 if ((group->count == 0) && (group->rdc_thrnum == 0)) { 1806 mutex_exit(&group->ra_queue.net_qlock); 1807 /* 1808 * Group now empty, so destroy 1809 * Race with remove_from_group while write thread was running 1810 */ 1811 #ifdef DEBUG 1812 cmn_err(CE_WARN, "!rdc_flush_memq: group being destroyed"); 1813 #endif 1814 rdc_delgroup(group); 1815 krdc->group = NULL; 1816 rdc_many_exit(krdc); 1817 return; 1818 } 1819 mutex_exit(&group->ra_queue.net_qlock); 1820 rdc_many_exit(krdc); 1821 } 1822 1823 /* 1824 * rdc_flush_diskq 1825 * disk queue flusher 1826 */ 1827 void 1828 rdc_flush_diskq(int index) 1829 { 1830 rdc_k_info_t *krdc = &rdc_k_info[index]; 1831 rdc_u_info_t *urdc = &rdc_u_info[index]; 1832 rdc_aio_t *aio = NULL; 1833 disk_queue *q; 1834 net_queue *nq; 1835 int dowork; 1836 int rc; 1837 rdc_group_t *group = krdc->group; 1838 1839 if (!group || group->count == 0) { 1840 #ifdef DEBUG 1841 cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!"); 1842 #endif 1843 return; 1844 } 1845 1846 if (!krdc->c_fd) { 1847 #ifdef DEBUG 1848 cmn_err(CE_WARN, "!rdc_flush_diskq: no c_fd!"); 1849 #endif 1850 return; 1851 } 1852 1853 #ifdef DEBUG_DISABLE 1854 if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) { 1855 cmn_err(CE_WARN, "!rdc_flush_diskq: DISABLE PENDING!"); 1856 /* 1857 * Need to continue as we may be trying to flush IO 1858 * while trying to disable or suspend 1859 */ 1860 } 1861 #endif 1862 q = &group->diskq; 1863 nq = &group->ra_queue; 1864 1865 if (IS_QSTATE(q, RDC_QDISABLEPEND) || IS_STATE(urdc, RDC_LOGGING)) { 1866 #ifdef DEBUG 1867 cmn_err(CE_NOTE, "!flusher thread death 1 %x", QSTATE(q)); 1868 #endif 1869 goto thread_death; 1870 } 1871 1872 dowork = 1; 1873 /* CONSTCOND */ 1874 while (dowork) { 1875 if (net_exit == ATM_EXIT) 1876 break; 1877 1878 group = krdc->group; 1879 if (!group || group->count == 0) { 1880 #ifdef DEBUG 1881 cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!"); 1882 #endif 1883 break; 1884 } 1885 1886 do { 1887 rc = 0; 1888 if ((IS_STATE(urdc, RDC_LOGGING)) || 1889 (IS_STATE(urdc, RDC_SYNCING)) || 1890 (nq->qfflags & RDC_QFILLSLEEP)) 1891 goto thread_death; 1892 1893 aio = rdc_dequeue(krdc, &rc); 1894 1895 if ((IS_STATE(urdc, RDC_LOGGING)) || 1896 (IS_STATE(urdc, RDC_SYNCING)) || 1897 (nq->qfflags & RDC_QFILLSLEEP)) { 1898 goto thread_death; 1899 } 1900 if (rc == EAGAIN) { 1901 delay(40); 1902 } 1903 1904 } while (rc == EAGAIN); 1905 1906 if (aio == NULL) { 1907 break; 1908 } 1909 1910 aio->iostatus = RDC_IO_INIT; 1911 1912 mutex_enter(QLOCK(q)); 1913 q->inflbls += aio->len; 1914 q->inflitems++; 1915 mutex_exit(QLOCK(q)); 1916 1917 _rdc_remote_flush(aio); 1918 1919 mutex_enter(QLOCK(q)); 1920 q->inflbls -= aio->len; 1921 q->inflitems--; 1922 1923 if ((group->seqack == RDC_NEWSEQ) && 1924 (group->seq != RDC_NEWSEQ + 1)) { 1925 if ((nq->net_qhead == NULL) || 1926 (nq->net_qhead->seq != RDC_NEWSEQ + 1)) { 1927 /* 1928 * We are an old thread, and the 1929 * queue sequence has been reset 1930 * during the network write above. 1931 * As such we mustn't pull another 1932 * job from the queue until the 1933 * first sequence message has been ack'ed. 1934 * Just die instead. Unless of course, 1935 * this thread is the first sequence that 1936 * has just been ack'ed. 1937 */ 1938 dowork = 0; 1939 } 1940 } 1941 mutex_exit(QLOCK(q)); 1942 1943 if (aio->iostatus == RDC_IO_CANCELLED) { 1944 rdc_dump_queue(aio->index); 1945 kmem_free(aio, sizeof (*aio)); 1946 aio = NULL; 1947 if (group) { /* seq gets bumped on dequeue */ 1948 mutex_enter(QLOCK(q)); 1949 rdc_dump_iohdrs(q); 1950 SET_QNXTIO(q, QHEAD(q)); 1951 SET_QCOALBOUNDS(q, QHEAD(q)); 1952 group->seq = RDC_NEWSEQ; 1953 group->seqack = RDC_NEWSEQ; 1954 mutex_exit(QLOCK(q)); 1955 } 1956 break; 1957 } 1958 1959 if ((aio->iostatus != RDC_IO_DONE) && (group->count)) { 1960 rdc_k_info_t *krdctmp = &rdc_k_info[aio->index]; 1961 if (krdctmp->type_flag & RDC_DISABLEPEND) { 1962 kmem_free(aio, sizeof (*aio)); 1963 aio = NULL; 1964 goto thread_death; 1965 } 1966 rdc_group_enter(krdc); 1967 rdc_group_log(krdc, 1968 RDC_NOFLUSH | RDC_ALLREMOTE | RDC_QUEUING, 1969 "diskq flush aio status not RDC_IO_DONE"); 1970 rdc_group_exit(krdc); 1971 rdc_dump_queue(aio->index); 1972 } 1973 1974 kmem_free(aio, sizeof (*aio)); 1975 aio = NULL; 1976 1977 #ifdef DEBUG_DISABLE 1978 if (krdc->type_flag & RDC_DISABLEPEND) { 1979 cmn_err(CE_WARN, 1980 "!rdc_flush_diskq: DISABLE PENDING after IO!"); 1981 } 1982 #endif 1983 if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf) 1984 break; 1985 1986 if (IS_QSTATE(q, RDC_QDISABLEPEND)) { 1987 #ifdef DEBUG 1988 cmn_err(CE_NOTE, "!flusher thread death 2"); 1989 #endif 1990 break; 1991 } 1992 } 1993 thread_death: 1994 rdc_many_enter(krdc); 1995 mutex_enter(QLOCK(q)); 1996 group->rdc_thrnum--; 1997 group->rdc_writer = 0; 1998 1999 if (aio && aio->qhandle) { 2000 aio->qhandle->sb_user--; 2001 if (aio->qhandle->sb_user == 0) { 2002 (void) _rdc_rsrv_diskq(krdc->group); 2003 rdc_fixlen(aio); 2004 (void) nsc_free_buf(aio->qhandle); 2005 aio->qhandle = NULL; 2006 aio->handle = NULL; 2007 _rdc_rlse_diskq(krdc->group); 2008 } 2009 } 2010 if ((group->count == 0) && (group->rdc_thrnum == 0)) { 2011 mutex_exit(QLOCK(q)); 2012 /* 2013 * Group now empty, so destroy 2014 * Race with remove_from_group while write thread was running 2015 */ 2016 #ifdef DEBUG 2017 cmn_err(CE_WARN, "!rdc_flush_diskq: group being destroyed"); 2018 #endif 2019 mutex_enter(&group->diskqmutex); 2020 rdc_close_diskq(group); 2021 mutex_exit(&group->diskqmutex); 2022 rdc_delgroup(group); 2023 krdc->group = NULL; 2024 rdc_many_exit(krdc); 2025 return; 2026 } 2027 mutex_exit(QLOCK(q)); 2028 rdc_many_exit(krdc); 2029 } 2030 2031 /* 2032 * _rdc_remote_flush 2033 * Flush a single block ANON block 2034 * this function will flush from either the disk queue 2035 * or the memory queue. The appropriate locks must be 2036 * taken out etc, etc ... 2037 */ 2038 static void 2039 _rdc_remote_flush(rdc_aio_t *aio) 2040 { 2041 rdc_k_info_t *krdc = &rdc_k_info[aio->index]; 2042 rdc_u_info_t *urdc = &rdc_u_info[aio->index]; 2043 disk_queue *q = &krdc->group->diskq; 2044 kmutex_t *qlock; 2045 rdc_group_t *group; 2046 nsc_buf_t *h = NULL; 2047 int reserved = 0; 2048 int rtype = RDC_RAW; 2049 int rc; 2050 uint_t maxseq; 2051 struct netwriteres netret; 2052 int waitq = 1; 2053 int vflags; 2054 2055 group = krdc->group; 2056 netret.vecdata.vecdata_val = NULL; 2057 netret.vecdata.vecdata_len = 0; 2058 2059 /* Where did we get this aio from anyway? */ 2060 if (RDC_IS_DISKQ(group)) { 2061 qlock = &group->diskq.disk_qlock; 2062 } else { 2063 qlock = &group->ra_queue.net_qlock; 2064 } 2065 2066 /* 2067 * quench transmission if we are too far ahead of the 2068 * server Q, or it will overflow. 2069 * Must fail all requests while asyncdis is set. 2070 * It will be cleared when the last thread to be discarded 2071 * sets the asyncstall counter to zero. 2072 * Note the thread within rdc_net_write 2073 * also bumps the asyncstall counter. 2074 */ 2075 2076 mutex_enter(qlock); 2077 if (group->asyncdis) { 2078 aio->iostatus = RDC_IO_CANCELLED; 2079 mutex_exit(qlock); 2080 goto failed; 2081 } 2082 /* don't go to sleep if we have gone logging! */ 2083 vflags = rdc_get_vflags(urdc); 2084 if ((vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) { 2085 if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group)) 2086 aio->iostatus = RDC_IO_CANCELLED; 2087 2088 mutex_exit(qlock); 2089 goto failed; 2090 } 2091 2092 while (maxseq = group->seqack + RDC_MAXPENDQ + 1, 2093 maxseq = (maxseq < group->seqack) ? maxseq + RDC_NEWSEQ + 1 2094 : maxseq, !RDC_INFRONT(aio->seq, maxseq)) { 2095 group->asyncstall++; 2096 ASSERT(!IS_STATE(urdc, RDC_LOGGING)); 2097 cv_wait(&group->asyncqcv, qlock); 2098 group->asyncstall--; 2099 ASSERT(group->asyncstall >= 0); 2100 if (group->asyncdis) { 2101 if (group->asyncstall == 0) { 2102 group->asyncdis = 0; 2103 } 2104 aio->iostatus = RDC_IO_CANCELLED; 2105 mutex_exit(qlock); 2106 goto failed; 2107 } 2108 /* 2109 * See if we have gone into logging mode 2110 * since sleeping. 2111 */ 2112 vflags = rdc_get_vflags(urdc); 2113 if (vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING)) { 2114 if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group)) 2115 aio->iostatus = RDC_IO_CANCELLED; 2116 2117 mutex_exit(qlock); 2118 goto failed; 2119 } 2120 } 2121 mutex_exit(qlock); 2122 2123 if ((krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) { 2124 mutex_enter(krdc->io_kstats->ks_lock); 2125 kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats)); 2126 mutex_exit(krdc->io_kstats->ks_lock); 2127 waitq = 0; 2128 } 2129 2130 2131 rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL); 2132 if (rc != 0) { 2133 #ifdef DEBUG 2134 cmn_err(CE_WARN, "!_rdc_remote_flush: reserve, index %d, rc %d", 2135 aio->index, rc); 2136 #endif 2137 goto failed; 2138 } 2139 2140 reserved = 1; 2141 /* 2142 * Case where we are multihop and calling with no ANON bufs 2143 * Need to do the read to fill the buf. 2144 */ 2145 if (!aio->handle) { 2146 rc = nsc_alloc_buf(RDC_U_FD(krdc), aio->pos, aio->len, 2147 (aio->flag & ~NSC_WRITE) | NSC_READ, &h); 2148 if (!RDC_SUCCESS(rc)) { 2149 #ifdef DEBUG 2150 cmn_err(CE_WARN, 2151 "!_rdc_remote_flush: alloc_buf, index %d, pos %" 2152 NSC_SZFMT ", len %" NSC_SZFMT ", rc %d", 2153 aio->index, aio->pos, aio->len, rc); 2154 #endif 2155 2156 goto failed; 2157 } 2158 aio->handle = h; 2159 aio->handle->sb_user = RDC_NULLBUFREAD; 2160 } 2161 2162 mutex_enter(qlock); 2163 if (group->asyncdis) { 2164 if (group->asyncstall == 0) { 2165 group->asyncdis = 0; 2166 } 2167 aio->iostatus = RDC_IO_CANCELLED; 2168 mutex_exit(qlock); 2169 goto failed; 2170 } 2171 group->asyncstall++; 2172 mutex_exit(qlock); 2173 2174 2175 if (krdc->remote_index < 0) { 2176 /* 2177 * this should be ok, we are flushing, not rev syncing. 2178 * remote_index could be -1 if we lost a race with 2179 * resume and the flusher trys to flush an io from 2180 * another set that has not resumed 2181 */ 2182 krdc->remote_index = rdc_net_state(krdc->index, CCIO_SLAVE); 2183 DTRACE_PROBE1(remote_index_negative, int, krdc->remote_index); 2184 2185 } 2186 2187 /* 2188 * double check for logging, no check in net_write() 2189 * skip the write if you can, otherwise, if logging 2190 * avoid clearing the bit .. you don't know whose bit it may 2191 * also be. 2192 */ 2193 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) { 2194 aio->iostatus = RDC_IO_CANCELLED; 2195 mutex_enter(qlock); 2196 group->asyncstall--; 2197 mutex_exit(qlock); 2198 goto failed; 2199 } 2200 2201 rc = rdc_net_write(krdc->index, krdc->remote_index, 2202 aio->handle, aio->pos, aio->len, aio->seq, aio->qpos, &netret); 2203 2204 mutex_enter(qlock); 2205 group->asyncstall--; 2206 if (group->asyncdis) { 2207 if (group->asyncstall == 0) { 2208 group->asyncdis = 0; 2209 } 2210 aio->iostatus = RDC_IO_CANCELLED; 2211 mutex_exit(qlock); 2212 goto failed; 2213 } 2214 2215 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) { 2216 mutex_exit(qlock); 2217 aio->iostatus = RDC_IO_CANCELLED; 2218 goto failed; 2219 } 2220 2221 ASSERT(aio->handle); 2222 if (rc != 0) { 2223 #ifdef DEBUG 2224 cmn_err(CE_WARN, 2225 "!_rdc_remote_flush: write, index %d, pos %" NSC_SZFMT 2226 ", len %" NSC_SZFMT ", " 2227 "rc %d seq %u group seq %u seqack %u qpos %" NSC_SZFMT, 2228 aio->index, aio->pos, aio->len, rc, aio->seq, 2229 group->seq, group->seqack, aio->qpos); 2230 #endif 2231 if (rc == ENOLINK) { 2232 cmn_err(CE_WARN, 2233 "!Hard timeout detected (%d sec) " 2234 "on SNDR set %s:%s", 2235 rdc_rpc_tmout, urdc->secondary.intf, 2236 urdc->secondary.file); 2237 } 2238 mutex_exit(qlock); 2239 goto failed; 2240 } else { 2241 aio->iostatus = RDC_IO_DONE; 2242 } 2243 2244 if (RDC_IS_DISKQ(group)) { 2245 /* free locally alloc'd handle */ 2246 if (aio->handle->sb_user == RDC_NULLBUFREAD) { 2247 (void) nsc_free_buf(aio->handle); 2248 aio->handle = NULL; 2249 } 2250 aio->qhandle->sb_user--; 2251 if (aio->qhandle->sb_user == 0) { 2252 (void) _rdc_rsrv_diskq(group); 2253 rdc_fixlen(aio); 2254 (void) nsc_free_buf(aio->qhandle); 2255 aio->qhandle = NULL; 2256 aio->handle = NULL; 2257 _rdc_rlse_diskq(group); 2258 } 2259 2260 } else { 2261 (void) nsc_free_buf(aio->handle); 2262 aio->handle = NULL; 2263 } 2264 2265 mutex_exit(qlock); 2266 2267 _rdc_rlse_devs(krdc, rtype); 2268 2269 if (netret.result == 0) { 2270 vflags = rdc_get_vflags(urdc); 2271 2272 if (!(vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) { 2273 RDC_CLR_BITMAP(krdc, aio->pos, aio->len, \ 2274 0xffffffff, RDC_BIT_BUMP); 2275 2276 if (RDC_IS_DISKQ(krdc->group)) { 2277 if (!IS_STATE(urdc, RDC_LOGGING)) { 2278 /* tell queue data has been flushed */ 2279 rdc_clr_iohdr(krdc, aio->qpos); 2280 } else { /* throw away queue, logging */ 2281 mutex_enter(qlock); 2282 rdc_dump_iohdrs(q); 2283 SET_QNXTIO(q, QHEAD(q)); 2284 SET_QCOALBOUNDS(q, QHEAD(q)); 2285 mutex_exit(qlock); 2286 } 2287 } 2288 } 2289 2290 mutex_enter(qlock); 2291 /* 2292 * Check to see if the reply has arrived out of 2293 * order, if so don't update seqack. 2294 */ 2295 if (!RDC_INFRONT(aio->seq, group->seqack)) { 2296 group->seqack = aio->seq; 2297 } 2298 #ifdef DEBUG 2299 else { 2300 rdc_ooreply++; 2301 } 2302 #endif 2303 if (group->asyncstall) { 2304 cv_broadcast(&group->asyncqcv); 2305 } 2306 mutex_exit(qlock); 2307 } else if (netret.result < 0) { 2308 aio->iostatus = RDC_IO_FAILED; 2309 } 2310 2311 /* 2312 * see if we have any pending async requests we can mark 2313 * as done. 2314 */ 2315 2316 if (netret.vecdata.vecdata_len) { 2317 net_pendvec_t *vecp; 2318 net_pendvec_t *vecpe; 2319 vecp = netret.vecdata.vecdata_val; 2320 vecpe = netret.vecdata.vecdata_val + netret.vecdata.vecdata_len; 2321 while (vecp < vecpe) { 2322 rdc_k_info_t *krdcp = &rdc_k_info[vecp->pindex]; 2323 rdc_u_info_t *urdcp = &rdc_u_info[vecp->pindex]; 2324 /* 2325 * we must always still be in the same group. 2326 */ 2327 ASSERT(krdcp->group == group); 2328 vflags = rdc_get_vflags(urdcp); 2329 2330 if (!(vflags & 2331 (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) { 2332 RDC_CLR_BITMAP(krdcp, vecp->apos, vecp->alen, \ 2333 0xffffffff, RDC_BIT_BUMP); 2334 if (RDC_IS_DISKQ(krdcp->group)) { 2335 if (!IS_STATE(urdc, RDC_LOGGING)) { 2336 /* update queue info */ 2337 rdc_clr_iohdr(krdc, vecp->qpos); 2338 } else { /* we've gone logging */ 2339 mutex_enter(qlock); 2340 rdc_dump_iohdrs(q); 2341 SET_QNXTIO(q, QHEAD(q)); 2342 SET_QCOALBOUNDS(q, QHEAD(q)); 2343 mutex_exit(qlock); 2344 } 2345 } 2346 } 2347 2348 /* 2349 * see if we can re-start transmission 2350 */ 2351 mutex_enter(qlock); 2352 if (!RDC_INFRONT(vecp->seq, group->seqack)) { 2353 group->seqack = vecp->seq; 2354 } 2355 #ifdef DEBUG 2356 else { 2357 rdc_ooreply++; 2358 } 2359 #endif 2360 DTRACE_PROBE1(pendvec_return, int, vecp->seq); 2361 2362 if (group->asyncstall) { 2363 cv_broadcast(&group->asyncqcv); 2364 } 2365 mutex_exit(qlock); 2366 vecp++; 2367 } 2368 } 2369 if (netret.vecdata.vecdata_val) 2370 kmem_free(netret.vecdata.vecdata_val, 2371 netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); 2372 return; 2373 failed: 2374 2375 /* perhaps we have a few threads stuck .. */ 2376 if (group->asyncstall) { 2377 group->asyncdis = 1; 2378 cv_broadcast(&group->asyncqcv); 2379 } 2380 if (netret.vecdata.vecdata_val) 2381 kmem_free(netret.vecdata.vecdata_val, 2382 netret.vecdata.vecdata_len * sizeof (net_pendvec_t)); 2383 2384 mutex_enter(qlock); 2385 if (RDC_IS_DISKQ(group)) { 2386 /* free locally alloc'd hanlde */ 2387 if ((aio->handle) && 2388 (aio->handle->sb_user == RDC_NULLBUFREAD)) { 2389 (void) nsc_free_buf(aio->handle); 2390 aio->handle = NULL; 2391 } 2392 aio->qhandle->sb_user--; 2393 if (aio->qhandle->sb_user == 0) { 2394 (void) _rdc_rsrv_diskq(group); 2395 rdc_fixlen(aio); 2396 (void) nsc_free_buf(aio->qhandle); 2397 aio->qhandle = NULL; 2398 aio->handle = NULL; 2399 _rdc_rlse_diskq(group); 2400 } 2401 } else { 2402 if (aio->handle) { 2403 (void) nsc_free_buf(aio->handle); 2404 aio->handle = NULL; 2405 } 2406 } 2407 mutex_exit(qlock); 2408 2409 if (reserved) { 2410 _rdc_rlse_devs(krdc, rtype); 2411 } 2412 2413 if ((waitq && krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) { 2414 mutex_enter(krdc->io_kstats->ks_lock); 2415 kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats)); 2416 mutex_exit(krdc->io_kstats->ks_lock); 2417 } 2418 2419 /* make sure that the bit is still set */ 2420 RDC_CHECK_BIT(krdc, aio->pos, aio->len); 2421 2422 if (aio->iostatus != RDC_IO_CANCELLED) 2423 aio->iostatus = RDC_IO_FAILED; 2424 } 2425 2426 2427 /* 2428 * rdc_drain_disk_queue 2429 * drain the async network queue for the whole group. Bail out if nothing 2430 * happens in 20 sec 2431 * returns -1 if it bails before the queues are drained. 2432 */ 2433 #define NUM_RETRIES 15 /* Number of retries to wait if no progress */ 2434 int 2435 rdc_drain_disk_queue(int index) 2436 { 2437 rdc_k_info_t *krdc = &rdc_k_info[index]; 2438 volatile rdc_group_t *group; 2439 volatile disk_queue *diskq; 2440 int threads, counter; 2441 long blocks; 2442 2443 /* Sanity checking */ 2444 if (index > rdc_max_sets) 2445 return (0); 2446 2447 /* 2448 * If there is no group or diskq configured, we can leave now 2449 */ 2450 if (!(group = krdc->group) || !(diskq = &group->diskq)) 2451 return (0); 2452 2453 /* 2454 * No need to wait if EMPTY and threads are gone 2455 */ 2456 counter = 0; 2457 while (!QEMPTY(diskq) || group->rdc_thrnum) { 2458 2459 /* 2460 * Capture counters to determine if progress is being made 2461 */ 2462 blocks = QBLOCKS(diskq); 2463 threads = group->rdc_thrnum; 2464 2465 /* 2466 * Wait 2467 */ 2468 delay(HZ); 2469 2470 /* 2471 * Has the group or disk queue gone away while delayed? 2472 */ 2473 if (!(group = krdc->group) || !(diskq = &group->diskq)) 2474 return (0); 2475 2476 /* 2477 * Are we still seeing progress? 2478 */ 2479 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) { 2480 /* 2481 * No progress seen, increment retry counter 2482 */ 2483 if (counter++ > NUM_RETRIES) { 2484 return (-1); 2485 } 2486 } else { 2487 /* 2488 * Reset counter, as we've made progress 2489 */ 2490 counter = 0; 2491 } 2492 } 2493 2494 return (0); 2495 } 2496 2497 /* 2498 * decide what needs to be drained, disk or core 2499 * and drain it 2500 */ 2501 int 2502 rdc_drain_queue(int index) 2503 { 2504 rdc_k_info_t *krdc = &rdc_k_info[index]; 2505 rdc_group_t *group = krdc->group; 2506 2507 if (!group) 2508 return (0); 2509 2510 if (RDC_IS_DISKQ(group)) 2511 return (rdc_drain_disk_queue(index)); 2512 if (RDC_IS_MEMQ(group)) 2513 return (rdc_drain_net_queue(index)); 2514 /* oops.. */ 2515 #ifdef DEBUG 2516 cmn_err(CE_WARN, "!rdc_drain_queue: " 2517 "attempting drain of unknown Q type"); 2518 #endif 2519 return (0); 2520 } 2521 2522 /* 2523 * rdc_drain_net_queue 2524 * drain the async network queue for the whole group. Bail out if nothing 2525 * happens in 20 sec 2526 * returns -1 if it bails before the queues are drained. 2527 */ 2528 int 2529 rdc_drain_net_queue(int index) 2530 { 2531 rdc_k_info_t *krdc = &rdc_k_info[index]; 2532 volatile net_queue *q; 2533 int bail = 20; /* bail out in about 20 secs */ 2534 nsc_size_t blocks; 2535 2536 /* Sanity checking */ 2537 if (index > rdc_max_sets) 2538 return (0); 2539 if (!krdc->group) 2540 return (0); 2541 /* LINTED */ 2542 if (!(q = &krdc->group->ra_queue)) 2543 return (0); 2544 2545 /* CONSTCOND */ 2546 while (1) { 2547 2548 if (((volatile rdc_aio_t *)q->net_qhead == NULL) && 2549 (krdc->group->rdc_thrnum == 0)) { 2550 break; 2551 } 2552 2553 blocks = q->blocks; 2554 2555 q = (volatile net_queue *)&krdc->group->ra_queue; 2556 2557 if ((blocks == q->blocks) && 2558 (--bail <= 0)) { 2559 break; 2560 } 2561 2562 delay(HZ); 2563 } 2564 2565 if (bail <= 0) 2566 return (-1); 2567 2568 return (0); 2569 } 2570 2571 /* 2572 * rdc_dump_queue 2573 * We want to release all the blocks currently on the network flushing queue 2574 * We already have them logged in the bitmap. 2575 */ 2576 void 2577 rdc_dump_queue(int index) 2578 { 2579 rdc_k_info_t *krdc = &rdc_k_info[index]; 2580 rdc_aio_t *aio; 2581 net_queue *q; 2582 rdc_group_t *group; 2583 disk_queue *dq; 2584 kmutex_t *qlock; 2585 2586 group = krdc->group; 2587 2588 q = &group->ra_queue; 2589 dq = &group->diskq; 2590 2591 /* 2592 * gotta have both locks here for diskq 2593 */ 2594 2595 if (RDC_IS_DISKQ(group)) { 2596 mutex_enter(&q->net_qlock); 2597 if (q->qfill_sleeping == RDC_QFILL_AWAKE) { 2598 int tries = 3; 2599 #ifdef DEBUG_DISKQ 2600 cmn_err(CE_NOTE, 2601 "!dumpq sending diskq->memq flusher to sleep"); 2602 #endif 2603 q->qfflags |= RDC_QFILLSLEEP; 2604 mutex_exit(&q->net_qlock); 2605 while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--) 2606 delay(5); 2607 mutex_enter(&q->net_qlock); 2608 } 2609 } 2610 2611 if (RDC_IS_DISKQ(group)) { 2612 qlock = &dq->disk_qlock; 2613 (void) _rdc_rsrv_diskq(group); 2614 } else { 2615 qlock = &q->net_qlock; 2616 } 2617 2618 mutex_enter(qlock); 2619 2620 group->seq = RDC_NEWSEQ; /* reset the sequence number */ 2621 group->seqack = RDC_NEWSEQ; 2622 2623 /* if the q is on disk, dump the q->iohdr chain */ 2624 if (RDC_IS_DISKQ(group)) { 2625 rdc_dump_iohdrs(dq); 2626 2627 /* back up the nxtio pointer */ 2628 SET_QNXTIO(dq, QHEAD(dq)); 2629 SET_QCOALBOUNDS(dq, QHEAD(dq)); 2630 } 2631 2632 while (q->net_qhead) { 2633 rdc_k_info_t *tmpkrdc; 2634 aio = q->net_qhead; 2635 tmpkrdc = &rdc_k_info[aio->index]; 2636 2637 if (RDC_IS_DISKQ(group)) { 2638 aio->qhandle->sb_user--; 2639 if (aio->qhandle->sb_user == 0) { 2640 rdc_fixlen(aio); 2641 (void) nsc_free_buf(aio->qhandle); 2642 aio->qhandle = NULL; 2643 aio->handle = NULL; 2644 } 2645 } else { 2646 if (aio->handle) { 2647 (void) nsc_free_buf(aio->handle); 2648 aio->handle = NULL; 2649 } 2650 } 2651 2652 q->net_qhead = aio->next; 2653 RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len); 2654 2655 kmem_free(aio, sizeof (*aio)); 2656 if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(group)) { 2657 mutex_enter(tmpkrdc->io_kstats->ks_lock); 2658 kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats)); 2659 mutex_exit(tmpkrdc->io_kstats->ks_lock); 2660 } 2661 2662 } 2663 2664 q->net_qtail = NULL; 2665 q->blocks = 0; 2666 q->nitems = 0; 2667 2668 /* 2669 * See if we have stalled threads. 2670 */ 2671 done: 2672 if (group->asyncstall) { 2673 group->asyncdis = 1; 2674 cv_broadcast(&group->asyncqcv); 2675 } 2676 mutex_exit(qlock); 2677 if (RDC_IS_DISKQ(group)) { 2678 mutex_exit(&q->net_qlock); 2679 _rdc_rlse_diskq(group); 2680 } 2681 2682 } 2683 2684 2685 /* 2686 * rdc_clnt_get 2687 * Get a CLIENT handle and cache it 2688 */ 2689 2690 static int 2691 rdc_clnt_get(rdc_srv_t *svp, rpcvers_t vers, struct chtab **rch, CLIENT **clp) 2692 { 2693 uint_t max_msgsize; 2694 int retries; 2695 int ret; 2696 struct cred *cred; 2697 int num_clnts = 0; 2698 register struct chtab *ch; 2699 struct chtab **plistp; 2700 CLIENT *client = 0; 2701 2702 if (rch) { 2703 *rch = 0; 2704 } 2705 2706 if (clp) { 2707 *clp = 0; 2708 } 2709 2710 retries = 6; /* Never used for COTS in Solaris */ 2711 cred = ddi_get_cred(); 2712 max_msgsize = RDC_RPC_MAX; 2713 2714 mutex_enter(&rdc_clnt_lock); 2715 2716 ch = rdc_chtable; 2717 plistp = &rdc_chtable; 2718 2719 /* find the right ch_list chain */ 2720 2721 for (ch = rdc_chtable; ch != NULL; ch = ch->ch_next) { 2722 if (ch->ch_prog == RDC_PROGRAM && 2723 ch->ch_vers == vers && 2724 ch->ch_dev == svp->ri_knconf->knc_rdev && 2725 ch->ch_protofmly != NULL && 2726 strcmp(ch->ch_protofmly, 2727 svp->ri_knconf->knc_protofmly) == 0) { 2728 /* found the correct chain to walk */ 2729 break; 2730 } 2731 plistp = &ch->ch_next; 2732 } 2733 2734 if (ch != NULL) { 2735 /* walk the ch_list and try and find a free client */ 2736 2737 for (num_clnts = 0; ch != NULL; ch = ch->ch_list, num_clnts++) { 2738 if (ch->ch_inuse == FALSE) { 2739 /* suitable handle to reuse */ 2740 break; 2741 } 2742 plistp = &ch->ch_list; 2743 } 2744 } 2745 2746 if (ch == NULL && num_clnts >= MAXCLIENTS) { 2747 /* alloc a temporary handle and return */ 2748 2749 rdc_clnt_toomany++; 2750 mutex_exit(&rdc_clnt_lock); 2751 2752 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr), 2753 RDC_PROGRAM, vers, max_msgsize, retries, cred, &client); 2754 2755 if (ret != 0) { 2756 cmn_err(CE_NOTE, 2757 "!rdc_call: tli_kcreate failed %d", ret); 2758 return (ret); 2759 } 2760 2761 *rch = 0; 2762 *clp = client; 2763 (void) CLNT_CONTROL(client, CLSET_PROGRESS, NULL); 2764 return (ret); 2765 } 2766 2767 if (ch != NULL) { 2768 /* reuse a cached handle */ 2769 2770 ch->ch_inuse = TRUE; 2771 ch->ch_timesused++; 2772 mutex_exit(&rdc_clnt_lock); 2773 2774 *rch = ch; 2775 2776 if (ch->ch_client == NULL) { 2777 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr), 2778 RDC_PROGRAM, vers, max_msgsize, retries, 2779 cred, &ch->ch_client); 2780 if (ret != 0) { 2781 ch->ch_inuse = FALSE; 2782 return (ret); 2783 } 2784 2785 (void) CLNT_CONTROL(ch->ch_client, CLSET_PROGRESS, 2786 NULL); 2787 *clp = ch->ch_client; 2788 2789 return (0); 2790 } else { 2791 /* 2792 * Consecutive calls to CLNT_CALL() on the same client handle 2793 * get the same transaction ID. We want a new xid per call, 2794 * so we first reinitialise the handle. 2795 */ 2796 (void) clnt_tli_kinit(ch->ch_client, svp->ri_knconf, 2797 &(svp->ri_addr), max_msgsize, retries, cred); 2798 2799 *clp = ch->ch_client; 2800 return (0); 2801 } 2802 } 2803 2804 /* create new handle and cache it */ 2805 ch = (struct chtab *)kmem_zalloc(sizeof (*ch), KM_SLEEP); 2806 2807 if (ch) { 2808 ch->ch_inuse = TRUE; 2809 ch->ch_prog = RDC_PROGRAM; 2810 ch->ch_vers = vers; 2811 ch->ch_dev = svp->ri_knconf->knc_rdev; 2812 ch->ch_protofmly = (char *)kmem_zalloc( 2813 strlen(svp->ri_knconf->knc_protofmly)+1, KM_SLEEP); 2814 if (ch->ch_protofmly) 2815 (void) strcpy(ch->ch_protofmly, 2816 svp->ri_knconf->knc_protofmly); 2817 *plistp = ch; 2818 } 2819 2820 mutex_exit(&rdc_clnt_lock); 2821 2822 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr), 2823 RDC_PROGRAM, vers, max_msgsize, retries, cred, clp); 2824 2825 if (ret != 0) { 2826 if (ch) 2827 ch->ch_inuse = FALSE; 2828 cmn_err(CE_NOTE, "!rdc_call: tli_kcreate failed %d", ret); 2829 return (ret); 2830 } 2831 2832 *rch = ch; 2833 if (ch) 2834 ch->ch_client = *clp; 2835 2836 (void) CLNT_CONTROL(*clp, CLSET_PROGRESS, NULL); 2837 2838 return (ret); 2839 } 2840 2841 2842 long rdc_clnt_count = 0; 2843 2844 /* 2845 * rdc_clnt_call 2846 * Arguments: 2847 * rdc_srv_t *svp - rdc servinfo 2848 * rpcproc_t proc; - rpcid 2849 * rpcvers_t vers; - protocol version 2850 * xdrproc_t xargs;- xdr function 2851 * caddr_t argsp;- args to xdr function 2852 * xdrproc_t xres;- xdr function 2853 * caddr_t resp;- args to xdr function 2854 * struct timeval timeout; 2855 * Performs RPC client call using specific protocol and version 2856 */ 2857 2858 int 2859 rdc_clnt_call(rdc_srv_t *svp, rpcproc_t proc, rpcvers_t vers, 2860 xdrproc_t xargs, caddr_t argsp, 2861 xdrproc_t xres, caddr_t resp, struct timeval *timeout) 2862 { 2863 CLIENT *rh = NULL; 2864 int err; 2865 int tries = 0; 2866 struct chtab *ch = NULL; 2867 2868 err = rdc_clnt_get(svp, vers, &ch, &rh); 2869 if (err || !rh) 2870 return (err); 2871 2872 do { 2873 DTRACE_PROBE3(rdc_clnt_call_1, 2874 CLIENT *, rh, rpcproc_t, proc, xdrproc_t, xargs); 2875 2876 err = cl_call_sig(rh, proc, xargs, argsp, xres, resp, *timeout); 2877 2878 DTRACE_PROBE1(rdc_clnt_call_end, int, err); 2879 2880 switch (err) { 2881 case RPC_SUCCESS: /* bail now */ 2882 goto done; 2883 case RPC_INTR: /* No recovery from this */ 2884 goto done; 2885 case RPC_PROGVERSMISMATCH: 2886 goto done; 2887 case RPC_TLIERROR: 2888 /* fall thru */ 2889 case RPC_XPRTFAILED: 2890 /* Delay here to err on side of caution */ 2891 /* fall thru */ 2892 case RPC_VERSMISMATCH: 2893 2894 default: 2895 if (IS_UNRECOVERABLE_RPC(err)) { 2896 goto done; 2897 } 2898 tries++; 2899 /* 2900 * The call is in progress (over COTS) 2901 * Try the CLNT_CALL again, but don't 2902 * print a noisy error message 2903 */ 2904 if (err == RPC_INPROGRESS) 2905 break; 2906 cmn_err(CE_NOTE, "!SNDR client: err %d %s", 2907 err, clnt_sperrno(err)); 2908 } 2909 } while (tries && (tries < 2)); 2910 done: 2911 ++rdc_clnt_count; 2912 rdc_clnt_free(ch, rh); 2913 return (err); 2914 } 2915 2916 2917 /* 2918 * Call an rpc from the client side, not caring which protocol is used. 2919 */ 2920 int 2921 rdc_clnt_call_any(rdc_srv_t *svp, rdc_if_t *ip, rpcproc_t proc, 2922 xdrproc_t xargs, caddr_t argsp, 2923 xdrproc_t xres, caddr_t resp, struct timeval *timeout) 2924 { 2925 rpcvers_t vers; 2926 int rc; 2927 2928 if (ip != NULL) { 2929 vers = ip->rpc_version; 2930 } else { 2931 vers = RDC_VERS_MAX; 2932 } 2933 2934 do { 2935 rc = rdc_clnt_call(svp, proc, vers, xargs, argsp, 2936 xres, resp, timeout); 2937 2938 if (rc == RPC_PROGVERSMISMATCH) { 2939 /* 2940 * Downgrade and try again. 2941 */ 2942 vers--; 2943 } 2944 } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH)); 2945 2946 if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) { 2947 mutex_enter(&rdc_ping_lock); 2948 ip->rpc_version = vers; 2949 mutex_exit(&rdc_ping_lock); 2950 } 2951 2952 return (rc); 2953 } 2954 2955 /* 2956 * Call an rpc from the client side, starting with protocol specified 2957 */ 2958 int 2959 rdc_clnt_call_walk(rdc_k_info_t *krdc, rpcproc_t proc, xdrproc_t xargs, 2960 caddr_t argsp, xdrproc_t xres, caddr_t resp, 2961 struct timeval *timeout) 2962 { 2963 int rc; 2964 rpcvers_t vers; 2965 rdc_srv_t *svp = krdc->lsrv; 2966 rdc_if_t *ip = krdc->intf; 2967 vers = krdc->rpc_version; 2968 2969 do { 2970 rc = rdc_clnt_call(svp, proc, vers, xargs, argsp, 2971 xres, resp, timeout); 2972 2973 if (rc == RPC_PROGVERSMISMATCH) { 2974 /* 2975 * Downgrade and try again. 2976 */ 2977 vers--; 2978 } 2979 } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH)); 2980 2981 if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) { 2982 mutex_enter(&rdc_ping_lock); 2983 ip->rpc_version = vers; 2984 mutex_exit(&rdc_ping_lock); 2985 } 2986 2987 return (rc); 2988 } 2989 2990 /* 2991 * rdc_clnt_free 2992 * Free a client structure into the cache, or if this was a temporary 2993 * handle allocated above MAXCLIENTS, destroy it. 2994 */ 2995 static void 2996 rdc_clnt_free(struct chtab *ch, CLIENT *clp) 2997 { 2998 if (ch != NULL) { 2999 /* cached client, just clear inuse flag and return */ 3000 ASSERT(ch->ch_client == clp); 3001 ch->ch_inuse = FALSE; 3002 return; 3003 } 3004 3005 /* temporary handle allocated above MAXCLIENTS, so destroy it */ 3006 3007 if (clp->cl_auth) { 3008 AUTH_DESTROY(clp->cl_auth); 3009 clp->cl_auth = 0; 3010 } 3011 3012 CLNT_DESTROY(clp); 3013 } 3014 3015 3016 /* 3017 * _rdc_clnt_destroy 3018 * Free a chain (ch_list or ch_next) of cached clients 3019 */ 3020 static int 3021 _rdc_clnt_destroy(struct chtab **p, const int list) 3022 { 3023 struct chtab *ch; 3024 int leak = 0; 3025 3026 if (!p) 3027 return (0); 3028 3029 while (*p != NULL) { 3030 ch = *p; 3031 3032 /* 3033 * unlink from the chain 3034 * - this leaks the client if it was inuse 3035 */ 3036 3037 *p = list ? ch->ch_list : ch->ch_next; 3038 3039 if (!ch->ch_inuse) { 3040 /* unused client - destroy it */ 3041 3042 if (ch->ch_client) { 3043 if (ch->ch_client->cl_auth) { 3044 AUTH_DESTROY(ch->ch_client->cl_auth); 3045 ch->ch_client->cl_auth = 0; 3046 } 3047 3048 CLNT_DESTROY(ch->ch_client); 3049 ch->ch_client = 0; 3050 } 3051 3052 if (ch->ch_protofmly) 3053 kmem_free(ch->ch_protofmly, 3054 strlen(ch->ch_protofmly)+1); 3055 3056 kmem_free(ch, sizeof (*ch)); 3057 } else { 3058 /* remember client leak */ 3059 leak++; 3060 } 3061 } 3062 3063 return (leak); 3064 } 3065 3066 3067 /* 3068 * rdc_clnt_destroy 3069 * Free client caching table on unconfigure 3070 */ 3071 void 3072 rdc_clnt_destroy(void) 3073 { 3074 struct chtab *ch; 3075 int leak = 0; 3076 3077 mutex_enter(&rdc_clnt_lock); 3078 3079 /* destroy each ch_list chain */ 3080 3081 for (ch = rdc_chtable; ch; ch = ch->ch_next) { 3082 leak += _rdc_clnt_destroy(&ch->ch_list, 1); 3083 } 3084 3085 /* destroy the main ch_next chain */ 3086 leak += _rdc_clnt_destroy(&rdc_chtable, 0); 3087 3088 if (leak) { 3089 /* we are about to leak clients */ 3090 cmn_err(CE_WARN, 3091 "!rdc_clnt_destroy: leaking %d inuse clients", leak); 3092 } 3093 3094 mutex_exit(&rdc_clnt_lock); 3095 } 3096 3097 #ifdef DEBUG 3098 /* 3099 * Function to send an asynchronous net_data6 request 3100 * direct to a server to allow the generation of 3101 * out of order requests for ZatoIchi tests. 3102 */ 3103 int 3104 rdc_async6(void *arg, int mode, int *rvp) 3105 { 3106 int index; 3107 rdc_async6_t async6; 3108 struct net_data6 data6; 3109 rdc_k_info_t *krdc; 3110 rdc_u_info_t *urdc; 3111 char *data; 3112 int datasz; 3113 char *datap; 3114 int rc; 3115 struct timeval t; 3116 struct netwriteres netret; 3117 int i; 3118 3119 rc = 0; 3120 *rvp = 0; 3121 /* 3122 * copyin the user's arguments. 3123 */ 3124 if (ddi_copyin(arg, &async6, sizeof (async6), mode) < 0) { 3125 return (EFAULT); 3126 } 3127 3128 /* 3129 * search by the secondary host and file. 3130 */ 3131 mutex_enter(&rdc_conf_lock); 3132 for (index = 0; index < rdc_max_sets; index++) { 3133 urdc = &rdc_u_info[index]; 3134 krdc = &rdc_k_info[index]; 3135 3136 if (!IS_CONFIGURED(krdc)) 3137 continue; 3138 if (!IS_ENABLED(urdc)) 3139 continue; 3140 if (!IS_ASYNC(urdc)) 3141 continue; 3142 if (krdc->rpc_version < RDC_VERSION6) 3143 continue; 3144 3145 if ((strncmp(urdc->secondary.intf, async6.sechost, 3146 MAX_RDC_HOST_SIZE) == 0) && 3147 (strncmp(urdc->secondary.file, async6.secfile, 3148 NSC_MAXPATH) == 0)) { 3149 break; 3150 } 3151 } 3152 mutex_exit(&rdc_conf_lock); 3153 if (index >= rdc_max_sets) { 3154 return (ENOENT); 3155 } 3156 3157 if (async6.spos != -1) { 3158 if ((async6.spos < async6.pos) || 3159 ((async6.spos + async6.slen) > 3160 (async6.pos + async6.len))) { 3161 cmn_err(CE_WARN, "!Sub task not within range " 3162 "start %d length %d sub start %d sub length %d", 3163 async6.pos, async6.len, async6.spos, async6.slen); 3164 return (EIO); 3165 } 3166 } 3167 3168 datasz = FBA_SIZE(1); 3169 data = kmem_alloc(datasz, KM_SLEEP); 3170 datap = data; 3171 while (datap < &data[datasz]) { 3172 /* LINTED */ 3173 *datap++ = async6.pat; 3174 } 3175 3176 /* 3177 * Fill in the net databuffer prior to transmission. 3178 */ 3179 3180 data6.local_cd = krdc->index; 3181 if (krdc->remote_index == -1) { 3182 cmn_err(CE_WARN, "!Remote index not known"); 3183 kmem_free(data, datasz); 3184 return (EIO); 3185 } else { 3186 data6.cd = krdc->remote_index; 3187 } 3188 data6.pos = async6.pos; 3189 data6.len = async6.len; 3190 data6.flag = 0; 3191 data6.idx = async6.idx; 3192 data6.seq = async6.seq; 3193 3194 if (async6.spos == -1) { 3195 data6.sfba = async6.pos; 3196 data6.nfba = async6.len; 3197 data6.endoblk = 1; 3198 3199 } else { 3200 data6.sfba = async6.spos; 3201 data6.nfba = async6.slen; 3202 data6.endoblk = async6.endind; 3203 } 3204 3205 data6.data.data_len = datasz; 3206 data6.data.data_val = data; 3207 3208 t.tv_sec = rdc_rpc_tmout; 3209 t.tv_usec = 0; 3210 3211 netret.vecdata.vecdata_val = NULL; 3212 netret.vecdata.vecdata_len = 0; 3213 3214 3215 rc = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, krdc->rpc_version, 3216 xdr_net_data6, (char *)&data6, xdr_netwriteres, (char *)&netret, 3217 &t); 3218 3219 kmem_free(data, datasz); 3220 if (rc == 0) { 3221 if (netret.result < 0) { 3222 rc = -netret.result; 3223 } 3224 cmn_err(CE_NOTE, "!async6: seq %u result %d index %d " 3225 "pendcnt %d", 3226 netret.seq, netret.result, netret.index, 3227 netret.vecdata.vecdata_len); 3228 for (i = 0; i < netret.vecdata.vecdata_len; i++) { 3229 net_pendvec_t pvec; 3230 bcopy(netret.vecdata.vecdata_val + i, &pvec, 3231 sizeof (net_pendvec_t)); 3232 cmn_err(CE_NOTE, "!Seq %u pos %llu len %llu", 3233 pvec.seq, (unsigned long long)pvec.apos, 3234 (unsigned long long)pvec.alen); 3235 } 3236 if (netret.vecdata.vecdata_val) 3237 kmem_free(netret.vecdata.vecdata_val, 3238 netret.vecdata.vecdata_len * 3239 sizeof (net_pendvec_t)); 3240 } else { 3241 cmn_err(CE_NOTE, "!async6: rpc call failed %d", rc); 3242 } 3243 *rvp = netret.index; 3244 return (rc); 3245 } 3246 3247 /* 3248 * Function to send an net_read6 request 3249 * direct to a server to allow the generation of 3250 * read requests. 3251 */ 3252 int 3253 rdc_readgen(void *arg, int mode, int *rvp) 3254 { 3255 int index; 3256 rdc_readgen_t readgen; 3257 rdc_readgen32_t readgen32; 3258 struct rread6 read6; 3259 struct rread read5; 3260 rdc_k_info_t *krdc; 3261 int ret; 3262 struct timeval t; 3263 struct rdcrdresult rr; 3264 int err; 3265 3266 *rvp = 0; 3267 rr.rr_bufsize = 0; /* rpc data buffer length (bytes) */ 3268 rr.rr_data = NULL; /* rpc data buffer */ 3269 if (ddi_model_convert_from(mode & FMODELS) == DDI_MODEL_ILP32) { 3270 if (ddi_copyin(arg, &readgen32, sizeof (readgen32), mode)) { 3271 return (EFAULT); 3272 } 3273 (void) strncpy(readgen.sechost, readgen32.sechost, 3274 MAX_RDC_HOST_SIZE); 3275 (void) strncpy(readgen.secfile, readgen32.secfile, NSC_MAXPATH); 3276 readgen.len = readgen32.len; 3277 readgen.pos = readgen32.pos; 3278 readgen.idx = readgen32.idx; 3279 readgen.flag = readgen32.flag; 3280 readgen.data = (void *)(unsigned long)readgen32.data; 3281 readgen.rpcversion = readgen32.rpcversion; 3282 } else { 3283 if (ddi_copyin(arg, &readgen, sizeof (readgen), mode)) { 3284 return (EFAULT); 3285 } 3286 } 3287 switch (readgen.rpcversion) { 3288 case 5: 3289 case 6: 3290 break; 3291 default: 3292 return (EINVAL); 3293 } 3294 3295 mutex_enter(&rdc_conf_lock); 3296 index = rdc_lookup_byhostdev(readgen.sechost, readgen.secfile); 3297 if (index >= 0) { 3298 krdc = &rdc_k_info[index]; 3299 } 3300 if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) { 3301 mutex_exit(&rdc_conf_lock); 3302 return (ENODEV); 3303 } 3304 /* 3305 * we should really call setbusy here. 3306 */ 3307 mutex_exit(&rdc_conf_lock); 3308 3309 t.tv_sec = rdc_rpc_tmout; 3310 t.tv_usec = 0; 3311 if (krdc->remote_index == -1) { 3312 cmn_err(CE_WARN, "!Remote index not known"); 3313 ret = EIO; 3314 goto out; 3315 } 3316 if (readgen.rpcversion == 6) { 3317 read6.cd = krdc->remote_index; 3318 read6.len = readgen.len; 3319 read6.pos = readgen.pos; 3320 read6.idx = readgen.idx; 3321 read6.flag = readgen.flag; 3322 } else { 3323 read5.cd = krdc->remote_index; 3324 read5.len = readgen.len; 3325 read5.pos = readgen.pos; 3326 read5.idx = readgen.idx; 3327 read5.flag = readgen.flag; 3328 } 3329 3330 if (readgen.flag & RDC_RREAD_START) { 3331 if (readgen.rpcversion == 6) { 3332 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, 3333 RDC_VERSION6, xdr_rread6, (char *)&read6, 3334 xdr_int, (char *)&ret, &t); 3335 } else { 3336 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, 3337 RDC_VERSION5, xdr_rread, (char *)&read5, 3338 xdr_int, (char *)&ret, &t); 3339 } 3340 if (err == 0) { 3341 *rvp = ret; 3342 ret = 0; 3343 } else { 3344 ret = EPROTO; 3345 } 3346 } else { 3347 if (readgen.rpcversion == 6) { 3348 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6, 3349 RDC_VERSION6, xdr_rread6, (char *)&read6, 3350 xdr_rdresult, (char *)&rr, &t); 3351 } else { 3352 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5, 3353 RDC_VERSION5, xdr_rread, (char *)&read5, 3354 xdr_rdresult, (char *)&rr, &t); 3355 } 3356 if (err == 0) { 3357 if (rr.rr_status != RDC_OK) { 3358 ret = EIO; 3359 goto out; 3360 } 3361 *rvp = rr.rr_bufsize; 3362 if (ddi_copyout(rr.rr_data, readgen.data, 3363 rr.rr_bufsize, mode) != 0) { 3364 ret = EFAULT; 3365 goto out; 3366 } 3367 ret = 0; 3368 } else { 3369 ret = EPROTO; 3370 goto out; 3371 } 3372 } 3373 out: 3374 if (rr.rr_data) { 3375 kmem_free(rr.rr_data, rr.rr_bufsize); 3376 } 3377 return (ret); 3378 } 3379 3380 3381 #endif