1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 /* Network data replicator Client side */
  27 
  28 
  29 #include <sys/types.h>
  30 #include <sys/debug.h>
  31 #include <sys/ksynch.h>
  32 #include <sys/cmn_err.h>
  33 #include <sys/kmem.h>
  34 #include <sys/cred.h>
  35 #include <sys/byteorder.h>
  36 #include <sys/errno.h>
  37 
  38 #ifdef _SunOS_2_6
  39 /*
  40  * on 2.6 both dki_lock.h and rpc/types.h define bool_t so we
  41  * define enum_t here as it is all we need from rpc/types.h
  42  * anyway and make it look like we included it. Yuck.
  43  */
  44 #define _RPC_TYPES_H
  45 typedef int enum_t;
  46 #else
  47 #ifndef DS_DDICT
  48 #include <rpc/types.h>
  49 #endif
  50 #endif /* _SunOS_2_6 */
  51 
  52 #ifndef DS_DDICT
  53 #include <rpc/auth.h>
  54 #include <rpc/svc.h>
  55 #include <rpc/xdr.h>
  56 #endif
  57 #include <sys/ddi.h>
  58 
  59 #include <sys/nsc_thread.h>
  60 #ifdef DS_DDICT
  61 #include <sys/nsctl/contract.h>
  62 #endif
  63 #include <sys/nsctl/nsctl.h>
  64 
  65 #include <sys/sdt.h>              /* dtrace is S10 or later */
  66 
  67 #include "rdc_io.h"
  68 #include "rdc_clnt.h"
  69 #include "rdc_bitmap.h"
  70 #include "rdc_diskq.h"
  71 
  72 
  73 kmutex_t rdc_clnt_lock;
  74 
  75 #ifdef DEBUG
  76 int noflush = 0;
  77 #endif
  78 
  79 int rdc_rpc_tmout = RDC_CLNT_TMOUT;
  80 static void rdc_clnt_free(struct chtab *, CLIENT *);
  81 static void _rdc_remote_flush(rdc_aio_t *);
  82 
  83 void rdc_flush_memq(int index);
  84 void rdc_flush_diskq(int index);
  85 int rdc_drain_net_queue(int index);
  86 void rdc_flusher_thread(int index);
  87 int  rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *);
  88 void rdc_init_diskq_header(rdc_group_t *grp, dqheader *hd);
  89 void rdc_dump_iohdrs(disk_queue *dq);
  90 rdc_aio_t *rdc_dequeue(rdc_k_info_t *krdc, int *rc);
  91 void rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_off_t qpos);
  92 void rdc_close_diskq(rdc_group_t *krdc);
  93 
  94 int rdc_writer(int index);
  95 
  96 static struct chtab *rdc_chtable = NULL;
  97 static int rdc_clnt_toomany;
  98 #ifdef DEBUG
  99 static int rdc_ooreply;
 100 #endif
 101 
 102 extern void rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag);
 103 extern int _rdc_rsrv_diskq(rdc_group_t *group);
 104 extern void _rdc_rlse_diskq(rdc_group_t *group);
 105 
 106 static enum clnt_stat
 107 cl_call_sig(struct __client *rh, rpcproc_t proc,
 108             xdrproc_t xargs, caddr_t argsp, xdrproc_t xres,
 109             caddr_t resp, struct timeval secs)
 110 {
 111         enum clnt_stat stat;
 112         k_sigset_t smask;
 113         sigintr(&smask, 0);
 114         rh->cl_nosignal = TRUE;
 115         stat = ((*(rh)->cl_ops->cl_call)\
 116             (rh, proc, xargs, argsp, xres, resp, secs));
 117         rh->cl_nosignal = FALSE;
 118         sigunintr(&smask);
 119         return (stat);
 120 }
 121 
 122 int
 123 rdc_net_getsize(int index, uint64_t *sizeptr)
 124 {
 125         struct timeval t;
 126         int err, size;
 127         rdc_k_info_t *krdc = &rdc_k_info[index];
 128         int remote_index = krdc->remote_index;
 129 
 130         *sizeptr = 0;
 131         if (krdc->remote_index < 0)
 132                 return (EINVAL);
 133 
 134         t.tv_sec = rdc_rpc_tmout;
 135         t.tv_usec = 0;
 136 
 137 #ifdef DEBUG
 138         if (krdc->intf == NULL)
 139                 cmn_err(CE_WARN,
 140                     "!rdc_net_getsize: null intf for index %d", index);
 141 #endif
 142         if (krdc->rpc_version <= RDC_VERSION5) {
 143                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE,
 144                     krdc->rpc_version, xdr_int, (char *)&remote_index,
 145                     xdr_int, (char *)&size, &t);
 146                 if (err == 0)
 147                         *sizeptr = size;
 148         } else {
 149                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSIZE6,
 150                     krdc->rpc_version, xdr_int, (char *)&remote_index,
 151                     xdr_u_longlong_t, (char *)sizeptr, &t);
 152         }
 153         return (err);
 154 }
 155 
 156 
 157 int
 158 rdc_net_state(int index, int options)
 159 {
 160         struct timeval t;
 161         int err;
 162         int remote_index = -1;
 163         rdc_u_info_t *urdc = &rdc_u_info[index];
 164         rdc_k_info_t *krdc = &rdc_k_info[index];
 165         struct set_state s;
 166         struct set_state4 s4;
 167         char neta[32], rneta[32];
 168         unsigned short *sp;
 169 
 170         t.tv_sec = rdc_rpc_tmout;
 171         t.tv_usec = 0;
 172 
 173         if (krdc->rpc_version < RDC_VERSION7) {
 174                 s4.netaddrlen = urdc->primary.addr.len;
 175                 s4.rnetaddrlen = urdc->secondary.addr.len;
 176                 bcopy(urdc->primary.addr.buf, s4.netaddr, s4.netaddrlen);
 177                 bcopy(urdc->secondary.addr.buf, s4.rnetaddr, s4.rnetaddrlen);
 178                 (void) strncpy(s4.pfile, urdc->primary.file, RDC_MAXNAMLEN);
 179                 (void) strncpy(s4.sfile, urdc->secondary.file, RDC_MAXNAMLEN);
 180                 s4.flag = options;
 181 
 182                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE,
 183                     krdc->rpc_version, xdr_set_state4, (char *)&s4, xdr_int,
 184                     (char *)&remote_index, &t);
 185         } else {
 186                 s.netaddrlen = urdc->primary.addr.len;
 187                 s.rnetaddrlen = urdc->secondary.addr.len;
 188                 s.netaddr.buf = neta;
 189                 s.rnetaddr.buf = rneta;
 190                 bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen);
 191                 bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen);
 192                 s.netaddr.len = urdc->primary.addr.len;
 193                 s.rnetaddr.len = urdc->secondary.addr.len;
 194                 s.netaddr.maxlen = urdc->primary.addr.len;
 195                 s.rnetaddr.maxlen = urdc->secondary.addr.len;
 196                 sp = (unsigned short *)s.netaddr.buf;
 197                 *sp = htons(*sp);
 198                 sp = (unsigned short *)s.rnetaddr.buf;
 199                 *sp = htons(*sp);
 200                 s.pfile = urdc->primary.file;
 201                 s.sfile = urdc->secondary.file;
 202                 s.flag = options;
 203 
 204                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_STATE,
 205                     krdc->rpc_version, xdr_set_state, (char *)&s, xdr_int,
 206                     (char *)&remote_index, &t);
 207         }
 208 
 209         if (err)
 210                 return (-1);
 211         else
 212                 return (remote_index);
 213 }
 214 
 215 
 216 /*
 217  * rdc_net_getbmap
 218  * gets the bitmaps from remote side and or's them  with remote bitmap
 219  */
 220 int
 221 rdc_net_getbmap(int index, int size)
 222 {
 223         struct timeval t;
 224         int err;
 225         struct bmap b;
 226         struct bmap6 b6;
 227         rdc_k_info_t *krdc;
 228 
 229         krdc = &rdc_k_info[index];
 230 
 231         if (krdc->remote_index < 0)
 232                 return (EINVAL);
 233 
 234         t.tv_sec = rdc_rpc_tmout;
 235         t.tv_usec = 0;
 236 #ifdef DEBUG
 237         if (krdc->intf == NULL)
 238                 cmn_err(CE_WARN,
 239                     "!rdc_net_getbmap: null intf for index %d", index);
 240 #endif
 241 
 242         if (krdc->rpc_version <= RDC_VERSION5) {
 243                 b.cd = krdc->remote_index;
 244                 b.dual = index;
 245                 b.size = size;
 246                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP,
 247                     krdc->rpc_version, xdr_bmap, (char *)&b, xdr_int,
 248                     (char *)&err, &t);
 249 
 250         } else {
 251                 b6.cd = krdc->remote_index;
 252                 b6.dual = index;
 253                 b6.size = size;
 254                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_BMAP6,
 255                     krdc->rpc_version, xdr_bmap6, (char *)&b6, xdr_int,
 256                     (char *)&err, &t);
 257         }
 258         return (err);
 259 }
 260 
 261 int sndr_proto = 0;
 262 
 263 /*
 264  * return state corresponding to rdc_host
 265  */
 266 int
 267 rdc_net_getstate(rdc_k_info_t *krdc, int *serial_mode, int *use_mirror,
 268     int *mirror_down, int network)
 269 {
 270         int err;
 271         struct timeval t;
 272         int state;
 273         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 274         struct set_state s;
 275 #ifdef sparc
 276         struct set_state4 s4;
 277 #endif
 278         char neta[32];
 279         char rneta[32];
 280         unsigned short *sp;
 281         char *setp = (char *)&s;
 282         xdrproc_t xdr_proc = xdr_set_state;
 283 
 284         if (krdc->lsrv && (krdc->intf == NULL || krdc->intf->if_down) &&
 285             network) /* fail fast */
 286                 return (-1);
 287 
 288         s.netaddrlen = urdc->primary.addr.len;
 289         s.rnetaddrlen = urdc->secondary.addr.len;
 290         s.pfile = urdc->primary.file;
 291         s.sfile = urdc->secondary.file;
 292         s.netaddr.buf = neta;
 293         s.rnetaddr.buf = rneta;
 294         bcopy(urdc->primary.addr.buf, s.netaddr.buf, s.netaddrlen);
 295         bcopy(urdc->secondary.addr.buf, s.rnetaddr.buf, s.rnetaddrlen);
 296         sp = (unsigned short *) s.netaddr.buf;
 297         *sp = htons(*sp);
 298         sp = (unsigned short *) s.rnetaddr.buf;
 299         *sp = htons(*sp);
 300         s.netaddr.len = urdc->primary.addr.len;
 301         s.rnetaddr.len = urdc->secondary.addr.len;
 302         s.netaddr.maxlen = urdc->primary.addr.maxlen;
 303         s.rnetaddr.maxlen = urdc->secondary.addr.maxlen;
 304         s.flag = 0;
 305 
 306         t.tv_sec = rdc_rpc_tmout;
 307         t.tv_usec = 0;
 308 
 309         if (sndr_proto)
 310                 krdc->rpc_version = sndr_proto;
 311         else
 312                 krdc->rpc_version = RDC_VERS_MAX;
 313 
 314 again:
 315         err = rdc_clnt_call(krdc->lsrv, RDCPROC_GETSTATE4, krdc->rpc_version,
 316             xdr_proc, setp, xdr_int, (char *)&state, &t);
 317 
 318         if (err == RPC_PROGVERSMISMATCH && (krdc->rpc_version !=
 319             RDC_VERS_MIN)) {
 320                 if (krdc->rpc_version-- == RDC_VERSION7) {
 321                         /* set_state struct changed with v7 of protocol */
 322 #ifdef sparc
 323                         s4.netaddrlen = urdc->primary.addr.len;
 324                         s4.rnetaddrlen = urdc->secondary.addr.len;
 325                         bcopy(urdc->primary.addr.buf, s4.netaddr,
 326                             s4.netaddrlen);
 327                         bcopy(urdc->secondary.addr.buf, s4.rnetaddr,
 328                             s4.rnetaddrlen);
 329                         (void) strncpy(s4.pfile, urdc->primary.file,
 330                             RDC_MAXNAMLEN);
 331                         (void) strncpy(s4.sfile, urdc->secondary.file,
 332                             RDC_MAXNAMLEN);
 333                         s4.flag = 0;
 334                         xdr_proc = xdr_set_state4;
 335                         setp = (char *)&s4;
 336 #else
 337                         /* x64 can not use protocols < 7 */
 338                         return (-1);
 339 #endif
 340                 }
 341                 goto again;
 342         }
 343 #ifdef DEBUG
 344         cmn_err(CE_NOTE, "!sndr get_state: Protocol ver %d", krdc->rpc_version);
 345 #endif
 346 
 347         if (err) {
 348                 return (-1);
 349         }
 350 
 351         if (state == -1)
 352                 return (-1);
 353 
 354         if (serial_mode)
 355                 *serial_mode = (state >> 2) & 1;
 356         if (use_mirror)
 357                 *use_mirror = (state >> 1) & 1;
 358         if (mirror_down)
 359                 *mirror_down = state & 1;
 360 
 361         return (0);
 362 }
 363 
 364 
 365 static struct xdr_discrim rdres_discrim[2] = {
 366         { (int)RDC_OK, xdr_readok },
 367         { __dontcare__, NULL_xdrproc_t }
 368 };
 369 
 370 
 371 /*
 372  * Reply from remote read (client side)
 373  */
 374 static bool_t
 375 xdr_rdresult(XDR *xdrs, readres *rr)
 376 {
 377 
 378         return (xdr_union(xdrs, (enum_t *)&(rr->rr_status),
 379             (caddr_t)&(rr->rr_ok), rdres_discrim, xdr_void));
 380 }
 381 
 382 static int
 383 rdc_rrstatus_decode(int status)
 384 {
 385         int ret = 0;
 386 
 387         if (status != RDC_OK) {
 388                 switch (status) {
 389                 case RDCERR_NOENT:
 390                         ret = ENOENT;
 391                         break;
 392                 case RDCERR_NOMEM:
 393                         ret = ENOMEM;
 394                         break;
 395                 default:
 396                         ret = EIO;
 397                         break;
 398                 }
 399         }
 400 
 401         return (ret);
 402 }
 403 
 404 
 405 int
 406 rdc_net_read(int local_index, int remote_index, nsc_buf_t *handle,
 407     nsc_off_t fba_pos, nsc_size_t fba_len)
 408 {
 409         struct rdcrdresult rr;
 410         rdc_k_info_t *krdc;
 411         rdc_u_info_t *urdc;
 412         struct rread list;
 413         struct rread6 list6;
 414         struct timeval t;
 415         uchar_t *sv_addr;
 416         nsc_vec_t *vec;
 417         int rpc_flag;
 418         nsc_size_t sv_len;
 419         int err;
 420         int ret;
 421         nsc_size_t len;
 422         nsc_size_t maxfbas;
 423         int transflag;
 424 
 425         if (handle == NULL)
 426                 return (EINVAL);
 427 
 428         if (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len)) {
 429 #ifdef DEBUG
 430                 cmn_err(CE_NOTE, "!rdc_net_read: handle bounds");
 431 #endif
 432                 return (EINVAL);
 433         }
 434 
 435         krdc = &rdc_k_info[local_index];
 436         urdc = &rdc_u_info[local_index];
 437 
 438         maxfbas = MAX_RDC_FBAS;
 439 
 440         if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) {
 441                 nsc_buf_t *remote_h = NULL;
 442                 int reserved = 0;
 443 
 444                 ret = nsc_reserve(krdc->remote_fd, NSC_MULTI);
 445                 if (RDC_SUCCESS(ret)) {
 446                         reserved = 1;
 447                         ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len,
 448                             NSC_RDBUF, &remote_h);
 449                 }
 450                 if (RDC_SUCCESS(ret)) {
 451                         ret = nsc_copy(remote_h, handle, fba_pos, fba_pos,
 452                             fba_len);
 453                         if (RDC_SUCCESS(ret)) {
 454                                 (void) nsc_free_buf(remote_h);
 455                                 nsc_release(krdc->remote_fd);
 456                                 return (0);
 457                         }
 458                 }
 459                 rdc_group_enter(krdc);
 460                 rdc_set_flags(urdc, RDC_FCAL_FAILED);
 461                 rdc_group_exit(krdc);
 462                 if (remote_h)
 463                         (void) nsc_free_buf(remote_h);
 464                 if (reserved)
 465                         nsc_release(krdc->remote_fd);
 466         }
 467 
 468         t.tv_sec = rdc_rpc_tmout;
 469         t.tv_usec = 0;
 470 
 471         if (rdc_get_vflags(urdc) & RDC_VOL_FAILED)
 472                 rpc_flag = RDC_RREAD_FAIL;
 473         else
 474                 rpc_flag = 0;
 475 
 476 #ifdef DEBUG
 477         if (krdc->intf == NULL)
 478                 cmn_err(CE_WARN,
 479                     "!rdc_net_read: null intf for index %d", local_index);
 480 #endif
 481         /*
 482          * switch on proto version.
 483          */
 484         len = fba_len;          /* length (FBAs) still to xfer */
 485         rr.rr_bufsize = 0;      /* rpc data buffer length (bytes) */
 486         rr.rr_data = NULL;      /* rpc data buffer */
 487         transflag = rpc_flag | RDC_RREAD_START; /* setup rpc */
 488         if (krdc->rpc_version <= RDC_VERSION5) {
 489                 ASSERT(fba_pos <= INT32_MAX);
 490                 list.pos = (int)fba_pos; /* fba position of start of chunk */
 491                 list.cd = remote_index; /* remote end cd */
 492                 /* send setup rpc */
 493                 list.flag = transflag;
 494                 ASSERT(len <= INT32_MAX);
 495                 list.len = (int)len;                    /* total fba length */
 496                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
 497                     krdc->rpc_version, xdr_rread, (char *)&list, xdr_int,
 498                     (char *)&ret, &t);
 499 
 500         } else {
 501                 list6.pos = fba_pos;    /* fba position of start of chunk */
 502                 list6.cd = remote_index;        /* remote end cd */
 503                 /* send setup rpc */
 504                 list6.flag = transflag; /* setup rpc */
 505                 ASSERT(len <= INT32_MAX);
 506                 list6.len = (int)len;                   /* total fba length */
 507                 err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
 508                     krdc->rpc_version, xdr_rread6, (char *)&list6, xdr_int,
 509                     (char *)&ret, &t);
 510         }
 511 
 512         if (err) {
 513 #ifdef DEBUG
 514                 cmn_err(CE_NOTE, "!rdc_net_read: setup err %d", err);
 515 #endif
 516                 if (err == RPC_INTR)
 517                         ret = EINTR;
 518                 else
 519                         ret = ENOLINK;
 520 
 521                 goto remote_rerror;
 522         }
 523 
 524         if (ret == 0) {         /* No valid index from r_net_read */
 525 #ifdef DEBUG
 526                 cmn_err(CE_NOTE,
 527                     "!rdc_net_read: no valid index from r_net_read");
 528 #endif
 529                 return (ENOBUFS);
 530         }
 531         transflag = rpc_flag | RDC_RREAD_DATA;
 532         if (krdc->rpc_version <= RDC_VERSION5) {
 533                 list.idx = ret;         /* save idx to return to server */
 534                 list.flag = transflag;
 535                                         /* move onto to data xfer rpcs */
 536         } else {
 537                 list6.idx = ret;        /* save idx to return to server */
 538                 list6.flag = transflag;
 539         }
 540 
 541         /* find starting position in handle */
 542 
 543         vec = handle->sb_vec;
 544 
 545         fba_pos -= handle->sb_pos;
 546 
 547         for (; fba_pos >= FBA_NUM(vec->sv_len); vec++)
 548                 fba_pos -= FBA_NUM(vec->sv_len);
 549 
 550         sv_addr = vec->sv_addr + FBA_SIZE(fba_pos);  /* data in vector */
 551         sv_len = vec->sv_len - FBA_SIZE(fba_pos);    /* bytes in vector */
 552 
 553         while (len) {
 554                 nsc_size_t translen;
 555                 if (len > maxfbas) {
 556                         translen = maxfbas;
 557                 } else {
 558                         translen = len;
 559                 }
 560 
 561                 if (FBA_SIZE(translen) > sv_len) {
 562                         translen = FBA_NUM(sv_len);
 563                 }
 564 
 565                 len -= translen;
 566                 if (len == 0) {
 567                         /* last data xfer rpc - tell server to cleanup */
 568                         transflag |= RDC_RREAD_END;
 569                 }
 570 
 571                 if (!rr.rr_data || (nsc_size_t)rr.rr_bufsize !=
 572                     FBA_SIZE(translen)) {
 573                         if (rr.rr_data)
 574                                 kmem_free(rr.rr_data, rr.rr_bufsize);
 575 
 576                         ASSERT(FBA_SIZE(translen) <= INT32_MAX);
 577                         rr.rr_bufsize = FBA_SIZE(translen);
 578                         rr.rr_data = kmem_alloc(rr.rr_bufsize, KM_NOSLEEP);
 579                 }
 580 
 581                 if (!rr.rr_data) {
 582                         /* error */
 583 #ifdef DEBUG
 584                         cmn_err(CE_NOTE, "!rdc_net_read: kmem_alloc failed");
 585 #endif
 586                         return (ENOMEM);
 587                 }
 588 
 589                 /* get data from remote end */
 590 
 591 #ifdef DEBUG
 592                 if (krdc->intf == NULL)
 593                         cmn_err(CE_WARN,
 594                             "!rdc_net_read: null intf for index %d",
 595                             local_index);
 596 #endif
 597                 if (krdc->io_kstats) {
 598                         mutex_enter(krdc->io_kstats->ks_lock);
 599                         kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
 600                         mutex_exit(krdc->io_kstats->ks_lock);
 601                 }
 602                 /*CONSTCOND*/
 603                 ASSERT(RDC_MAXDATA <= INT32_MAX);
 604                 ASSERT(translen <= RDC_MAXDATA);
 605                 if (krdc->rpc_version <= RDC_VERSION5) {
 606                         list.len = (int)translen;
 607                         list.flag = transflag;
 608                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
 609                             krdc->rpc_version, xdr_rread, (char *)&list,
 610                             xdr_rdresult, (char *)&rr, &t);
 611                 } else {
 612                         list6.len = (int)translen;
 613                         list6.flag = transflag;
 614                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
 615                             krdc->rpc_version, xdr_rread6, (char *)&list6,
 616                             xdr_rdresult, (char *)&rr, &t);
 617                 }
 618 
 619                 if (krdc->io_kstats) {
 620                         mutex_enter(krdc->io_kstats->ks_lock);
 621                         kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
 622                         mutex_exit(krdc->io_kstats->ks_lock);
 623                 }
 624 
 625                 if (err) {
 626 #ifdef DEBUG
 627                         cmn_err(CE_NOTE, "!rdc_net_read: rpc err %d", err);
 628 #endif
 629                         if (err == RPC_INTR) {
 630                                 ret = EINTR;
 631                         } else {
 632                                 ret = ENOLINK;
 633                         }
 634 
 635                         goto remote_rerror;
 636                 }
 637 
 638                 if (rr.rr_status != RDC_OK) {
 639                         ret = rdc_rrstatus_decode(rr.rr_status);
 640                         if (!ret)
 641                                 ret = EIO;
 642 
 643                         goto remote_rerror;
 644                 }
 645 
 646                 /* copy into handle */
 647 
 648                 bcopy(rr.rr_data, sv_addr, (size_t)rr.rr_bufsize);
 649 
 650                 /* update counters */
 651 
 652                 sv_addr += rr.rr_bufsize;
 653                 if (krdc->rpc_version <= RDC_VERSION5) {
 654                         list.pos += translen;
 655                 } else {
 656                         list6.pos += translen;
 657                 }
 658                 if (krdc->io_kstats) {
 659                         KSTAT_IO_PTR(krdc->io_kstats)->reads++;
 660                         KSTAT_IO_PTR(krdc->io_kstats)->nread += rr.rr_bufsize;
 661                 }
 662                 ASSERT(sv_len <= INT32_MAX);
 663                 ASSERT(sv_len >= (nsc_size_t)rr.rr_bufsize);
 664                 sv_len -= rr.rr_bufsize;
 665 
 666                 if (sv_len == 0) {
 667                         /* goto next vector */
 668                         vec++;
 669                         sv_addr = vec->sv_addr;
 670                         sv_len = vec->sv_len;
 671                 }
 672         }
 673 
 674         if (rr.rr_data)
 675                 kmem_free(rr.rr_data, rr.rr_bufsize);
 676 
 677         return (0);
 678 
 679 remote_rerror:
 680         if (rr.rr_data)
 681                 kmem_free(rr.rr_data, rr.rr_bufsize);
 682 
 683         return (ret ? ret : ENOLINK);
 684 }
 685 
 686 /*
 687  * rdc_net_write
 688  * Main remote write client side
 689  * Handles protocol selection as well as requests for remote allocation
 690  * and data transfer
 691  * Does local IO for FCAL
 692  * caller must clear bitmap on success
 693  */
 694 
 695 int
 696 rdc_net_write(int local_index, int remote_index, nsc_buf_t *handle,
 697     nsc_off_t fba_pos, nsc_size_t fba_len, uint_t aseq, int qpos,
 698     netwriteres *netres)
 699 {
 700         rdc_k_info_t *krdc;
 701         rdc_u_info_t *urdc;
 702         struct timeval t;
 703         nsc_vec_t *vec;
 704         int sv_len;
 705         nsc_off_t fpos;
 706         int err;
 707         struct netwriteres netret;
 708         struct netwriteres *netresptr;
 709         struct net_data5 dlist5;
 710         struct net_data6 dlist6;
 711         int ret;
 712         nsc_size_t maxfbas;
 713         int transflag;
 714         int translen;
 715         int transendoblk;
 716         char *transptr;
 717         int vflags;
 718 
 719         if (handle == NULL)
 720                 return (EINVAL);
 721 
 722         /* if not a diskq buffer */
 723         if ((qpos == -1) && (!RDC_HANDLE_LIMITS(handle, fba_pos, fba_len))) {
 724 #ifdef DEBUG
 725                 cmn_err(CE_NOTE, "!rdc_net_write: handle bounds");
 726 #endif
 727                 return (EINVAL);
 728         }
 729 
 730 
 731         t.tv_sec = rdc_rpc_tmout;
 732         t.tv_usec = 0;
 733 
 734         krdc = &rdc_k_info[local_index];
 735         urdc = &rdc_u_info[local_index];
 736 
 737         maxfbas = MAX_RDC_FBAS;
 738 
 739         /* FCAL IO */
 740         if (krdc->remote_fd && !(rdc_get_vflags(urdc) & RDC_FCAL_FAILED)) {
 741                 nsc_buf_t *remote_h = NULL;
 742                 int reserved = 0;
 743 
 744                 ret = nsc_reserve(krdc->remote_fd, NSC_MULTI);
 745                 if (RDC_SUCCESS(ret)) {
 746                         reserved = 1;
 747                         ret = nsc_alloc_buf(krdc->remote_fd, fba_pos, fba_len,
 748                             NSC_WRBUF, &remote_h);
 749                 }
 750                 if (RDC_SUCCESS(ret)) {
 751                         ret = nsc_copy(handle, remote_h, fba_pos, fba_pos,
 752                             fba_len);
 753                         if (RDC_SUCCESS(ret))
 754                                 ret = nsc_write(remote_h, fba_pos, fba_len, 0);
 755                         if (RDC_SUCCESS(ret)) {
 756                                 (void) nsc_free_buf(remote_h);
 757                                 nsc_release(krdc->remote_fd);
 758                                 return (0);
 759                         }
 760                 }
 761                 rdc_group_enter(krdc);
 762                 rdc_set_flags(urdc, RDC_FCAL_FAILED);
 763                 rdc_group_exit(krdc);
 764                 if (remote_h)
 765                         (void) nsc_free_buf(remote_h);
 766                 if (reserved)
 767                         nsc_release(krdc->remote_fd);
 768         }
 769 
 770         /*
 771          * At this point we must decide which protocol we are using and
 772          * do the right thing
 773          */
 774         netret.vecdata.vecdata_val = NULL;
 775         netret.vecdata.vecdata_len = 0;
 776         if (netres) {
 777                 netresptr = netres;
 778         } else {
 779                 netresptr = &netret;
 780         }
 781 
 782         vflags = rdc_get_vflags(urdc);
 783 
 784         if (vflags & (RDC_VOL_FAILED|RDC_BMP_FAILED))
 785                 transflag = RDC_RWRITE_FAIL;
 786         else
 787                 transflag = 0;
 788 
 789 
 790 #ifdef DEBUG
 791         if (krdc->intf == NULL)
 792                 cmn_err(CE_WARN, "!rdc_net_write: null intf for index %d",
 793                     local_index);
 794 #endif
 795 
 796         vec = handle->sb_vec;
 797 
 798         /*
 799          * find starting position in vector
 800          */
 801         if ((qpos == -1) || (handle->sb_user == RDC_NULLBUFREAD))
 802                 fpos = fba_pos - handle->sb_pos;
 803         else
 804                 fpos = (qpos + 1) - handle->sb_pos;
 805 
 806         for (; fpos >= FBA_NUM(vec->sv_len); vec++)
 807                 fpos -= FBA_NUM(vec->sv_len);
 808         sv_len = vec->sv_len - FBA_SIZE(fpos);       /* bytes in vector */
 809         transptr = (char *)vec->sv_addr + FBA_SIZE(fpos);
 810 
 811         if (krdc->rpc_version <= RDC_VERSION5) {
 812                 dlist5.local_cd = local_index;
 813                 dlist5.cd = remote_index;
 814                 ASSERT(fba_len <= INT32_MAX);
 815                 ASSERT(fba_pos <= INT32_MAX);
 816                 dlist5.len = (int)fba_len;
 817                 dlist5.pos = (int)fba_pos;
 818                 dlist5.idx = -1; /* Starting index */
 819                 dlist5.flag = transflag;
 820                 dlist5.seq = aseq;              /* sequence number */
 821                 dlist5.sfba = (int)fba_pos;     /* starting fba for this xfer */
 822         } else {
 823                 dlist6.local_cd = local_index;
 824                 dlist6.cd = remote_index;
 825                 ASSERT(fba_len <= INT32_MAX);
 826                 dlist6.len = (int)fba_len;
 827                 dlist6.qpos = qpos;
 828                 dlist6.pos = fba_pos;
 829                 dlist6.idx = -1; /* Starting index */
 830                 dlist6.flag = transflag;
 831                 dlist6.seq = aseq;              /* sequence number */
 832                 dlist6.sfba = fba_pos;          /* starting fba for this xfer */
 833         }
 834 
 835         transendoblk = 0;
 836         while (fba_len) {
 837                 if (!transptr) {
 838 #ifdef DEBUG
 839                         cmn_err(CE_WARN,
 840                             "!rdc_net_write: walked off end of handle!");
 841 #endif
 842                         ret = EINVAL;
 843                         goto remote_error;
 844                 }
 845 
 846                 if (fba_len > maxfbas) {
 847                         ASSERT(maxfbas <= INT32_MAX);
 848                         translen = (int)maxfbas;
 849                 } else {
 850                         ASSERT(fba_len <= INT32_MAX);
 851                         translen = (int)fba_len;
 852                 }
 853 
 854                 if (FBA_SIZE(translen) > sv_len) {
 855                         translen = FBA_NUM(sv_len);
 856                 }
 857 
 858                 fba_len -= translen;
 859                 if (fba_len == 0) {
 860                         /* last data xfer - tell server to commit */
 861                         transendoblk = 1;
 862                 }
 863 
 864 
 865 #ifdef DEBUG
 866                 if (krdc->intf == NULL)
 867                         cmn_err(CE_WARN,
 868                             "!rdc_net_write: null intf for index %d",
 869                             local_index);
 870 #endif
 871                 DTRACE_PROBE(rdc_netwrite_clntcall_start);
 872 
 873                 if (krdc->io_kstats) {
 874                         mutex_enter(krdc->io_kstats->ks_lock);
 875                         kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
 876                         mutex_exit(krdc->io_kstats->ks_lock);
 877                 }
 878                 if (krdc->rpc_version <= RDC_VERSION5) {
 879                         ret = 0;
 880                         dlist5.nfba = translen;
 881                         dlist5.endoblk = transendoblk;
 882                         dlist5.data.data_len = FBA_SIZE(translen);
 883                         dlist5.data.data_val = transptr;
 884                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE5,
 885                             krdc->rpc_version, xdr_net_data5,
 886                             (char *)&dlist5, xdr_int,
 887                             (char *)&ret, &t);
 888                         if (ret >= 0) {
 889                                 netresptr->result = 0;
 890                                 netresptr->index = ret;
 891                         } else {
 892                                 netresptr->result = ret;
 893                         }
 894                 } else {
 895                         netresptr->result = 0;
 896                         dlist6.nfba = translen;
 897                         dlist6.endoblk = transendoblk;
 898                         dlist6.data.data_len = FBA_SIZE(translen);
 899                         dlist6.data.data_val = transptr;
 900                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6,
 901                             krdc->rpc_version, xdr_net_data6,
 902                             (char *)&dlist6, xdr_netwriteres,
 903                             (char *)netresptr, &t);
 904                 }
 905 
 906                 if (krdc->io_kstats) {
 907                         mutex_enter(krdc->io_kstats->ks_lock);
 908                         kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
 909                         mutex_exit(krdc->io_kstats->ks_lock);
 910                 }
 911 
 912                 DTRACE_PROBE(rdc_netwrite_clntcall_end);
 913                 ret = netresptr->result;
 914                 if (err) {
 915                         if (err == RPC_INTR)
 916                                 ret = EINTR;
 917                         else if (err && ret != EPROTO)
 918                                 ret = ENOLINK;
 919 #ifdef DEBUG
 920                         cmn_err(CE_NOTE,
 921                             "!rdc_net_write(5): cd %d err %d ret %d",
 922                             remote_index, err, ret);
 923 #endif
 924                         goto remote_error;
 925                 }
 926                 /* Error from r_net_write5 */
 927                 if (netresptr->result < 0) {
 928 #ifdef DEBUG
 929                         cmn_err(CE_NOTE,
 930                             "!rdc_net_write: r_net_write(5) "
 931                             "returned: %d",
 932                             -netresptr->result);
 933 #endif
 934                         ret = -netresptr->result;
 935                         if (netret.vecdata.vecdata_val)
 936                                 kmem_free(netret.vecdata.vecdata_val,
 937                                     netret.vecdata.vecdata_len *
 938                                     sizeof (net_pendvec_t));
 939                         goto remote_error;
 940                 } else if (netresptr->index == 0) {
 941 #ifdef DEBUG
 942                         cmn_err(CE_NOTE,
 943                             "!rdc_net_write: no valid index from "
 944                             "r_net_write(5)");
 945 #endif
 946                         ret = ENOBUFS;
 947                         if (netret.vecdata.vecdata_val)
 948                                 kmem_free(netret.vecdata.vecdata_val,
 949                                     netret.vecdata.vecdata_len *
 950                                     sizeof (net_pendvec_t));
 951                         goto remote_error;
 952                 }
 953                 if (krdc->rpc_version <= RDC_VERSION5) {
 954                         dlist5.idx = netresptr->index;
 955                         dlist5.sfba += dlist5.nfba;
 956                 } else {
 957                         dlist6.idx = netresptr->index;
 958                         dlist6.sfba += dlist6.nfba;
 959                 }
 960                 /* update counters */
 961                 if (krdc->io_kstats) {
 962                         KSTAT_IO_PTR(krdc->io_kstats)->writes++;
 963                         KSTAT_IO_PTR(krdc->io_kstats)->nwritten +=
 964                             FBA_SIZE(translen);
 965                 }
 966                 transptr += FBA_SIZE(translen);
 967                 sv_len -= FBA_SIZE(translen);
 968 
 969                 if (sv_len <= 0) {
 970                         /* goto next vector */
 971                         vec++;
 972                         transptr = (char *)vec->sv_addr;
 973                         sv_len = vec->sv_len;
 974                 }
 975         }
 976         /*
 977          * this can't happen.....
 978          */
 979         if (netret.vecdata.vecdata_val)
 980                 kmem_free(netret.vecdata.vecdata_val,
 981                     netret.vecdata.vecdata_len *
 982                     sizeof (net_pendvec_t));
 983 
 984         return (0);
 985 
 986 remote_error:
 987         return (ret ? ret : ENOLINK);
 988 }
 989 
 990 void
 991 rdc_fixlen(rdc_aio_t *aio)
 992 {
 993         nsc_vec_t *vecp = aio->qhandle->sb_vec;
 994         nsc_size_t len = 0;
 995 
 996         while (vecp->sv_addr) {
 997                 len += FBA_NUM(vecp->sv_len);
 998                 vecp++;
 999         }
1000         aio->qhandle->sb_len = len;
1001 }
1002 
1003 /*
1004  * rdc_dump_alloc_bufs_cd
1005  * Dump allocated buffers (rdc_net_hnd's) for the specified cd.
1006  * this could be the flusher failing, if so, don't do the delay forever
1007  * Returns: 0 (success), EAGAIN (caller needs to try again).
1008  */
1009 int
1010 rdc_dump_alloc_bufs_cd(int index)
1011 {
1012         rdc_k_info_t *krdc;
1013         rdc_aio_t *aio;
1014         net_queue *q;
1015         disk_queue *dq;
1016         kmutex_t *qlock;
1017 
1018         krdc = &rdc_k_info[index];
1019 
1020 
1021         if (!krdc->c_fd) {
1022                 /* cannot do anything! */
1023 #ifdef DEBUG
1024                 cmn_err(CE_WARN, "!rdc_dump_alloc_bufs_cd(%d): c_fd NULL",
1025                     index);
1026 #endif
1027                 return (0);
1028         }
1029         rdc_dump_dsets(index);
1030 
1031         dq = &krdc->group->diskq;
1032 
1033         if (RDC_IS_DISKQ(krdc->group)) {
1034                 qlock = QLOCK(dq);
1035                 (void) _rdc_rsrv_diskq(krdc->group);
1036         } else {
1037                 qlock = &krdc->group->ra_queue.net_qlock;
1038         }
1039 
1040         /*
1041          * Now dump the async queue anonymous buffers
1042          * if we are a diskq, the we are using the diskq mutex.
1043          * However, we are flushing from diskq to memory queue
1044          * so we now need to grab the memory lock also
1045          */
1046 
1047         q = &krdc->group->ra_queue;
1048 
1049         if (RDC_IS_DISKQ(krdc->group)) {
1050                 mutex_enter(&q->net_qlock);
1051                 if (q->qfill_sleeping == RDC_QFILL_AWAKE) {
1052                         int tries = 5;
1053 #ifdef DEBUG_DISKQ
1054                         cmn_err(CE_NOTE,
1055                             "!dumpalloccd sending diskq->memq flush to sleep");
1056 #endif
1057                         q->qfflags |= RDC_QFILLSLEEP;
1058                         mutex_exit(&q->net_qlock);
1059 
1060                         while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--)
1061                                 delay(5);
1062                         mutex_enter(&q->net_qlock);
1063                 }
1064         }
1065 
1066         mutex_enter(qlock);
1067 
1068         while ((q->net_qhead != NULL)) {
1069                 rdc_k_info_t *tmpkrdc;
1070                 aio = q->net_qhead;
1071                 tmpkrdc = &rdc_k_info[aio->index];
1072 
1073                 if (RDC_IS_DISKQ(krdc->group)) {
1074                         aio->qhandle->sb_user--;
1075                         if (aio->qhandle->sb_user == 0) {
1076                                 rdc_fixlen(aio);
1077                                 (void) nsc_free_buf(aio->qhandle);
1078                                 aio->qhandle = NULL;
1079                                 aio->handle = NULL;
1080                         }
1081                 } else {
1082                         if (aio->handle) {
1083                                 (void) nsc_free_buf(aio->handle);
1084                                 aio->handle = NULL;
1085                         }
1086                 }
1087 
1088                 if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(krdc->group)) {
1089                         mutex_enter(tmpkrdc->io_kstats->ks_lock);
1090                         kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats));
1091                         mutex_exit(tmpkrdc->io_kstats->ks_lock);
1092                 }
1093                 q->net_qhead = q->net_qhead->next;
1094                 q->blocks -= aio->len;
1095                 q->nitems--;
1096 
1097                 RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len);
1098 
1099                 kmem_free(aio, sizeof (*aio));
1100         }
1101         q->net_qtail = NULL;
1102 
1103         if (krdc->group->asyncstall) {
1104                 krdc->group->asyncdis = 1;
1105                 cv_broadcast(&krdc->group->asyncqcv);
1106         }
1107         if (krdc->group->sleepq) {
1108                 rdc_sleepqdiscard(krdc->group);
1109         }
1110 
1111         krdc->group->seq = RDC_NEWSEQ;
1112         krdc->group->seqack = RDC_NEWSEQ;
1113         if (RDC_IS_DISKQ(krdc->group)) {
1114                 rdc_dump_iohdrs(dq);
1115                 SET_QNXTIO(dq, QHEAD(dq));
1116                 SET_QCOALBOUNDS(dq, QHEAD(dq));
1117         }
1118         mutex_exit(qlock);
1119 
1120         if (RDC_IS_DISKQ(krdc->group)) {
1121                 mutex_exit(&q->net_qlock);
1122                 _rdc_rlse_diskq(krdc->group);
1123         }
1124 
1125         return (0);
1126 }
1127 
1128 
1129 /*
1130  * rdc_dump_alloc_bufs
1131  * We have an error on the link
1132  * Try to dump all of the allocated bufs so we can cleanly recover
1133  * and not hang
1134  */
1135 void
1136 rdc_dump_alloc_bufs(rdc_if_t *ip)
1137 {
1138         rdc_k_info_t *krdc;
1139         int repeat;
1140         int index;
1141 
1142         for (index = 0; index < rdc_max_sets; index++) {
1143                 do {
1144                         krdc = &rdc_k_info[index];
1145                         repeat = 0;
1146                         if (krdc->intf == ip) {
1147                                 if (rdc_dump_alloc_bufs_cd(index) == EAGAIN) {
1148                                         repeat = 1;
1149                                         delay(2);
1150                                 }
1151                         }
1152                 } while (repeat);
1153         }
1154 }
1155 
1156 /*
1157  * returns 1 if the the throttle should throttle, 0 if not.
1158  */
1159 int
1160 _rdc_diskq_isfull(disk_queue *q, long len)
1161 {
1162         /* ---T----H----N--- */
1163         mutex_enter(QLOCK(q));
1164 
1165         if (FITSONQ(q, len + 1)) {
1166                 mutex_exit(QLOCK(q));
1167                 return (0);
1168         }
1169         mutex_exit(QLOCK(q));
1170         return (1);
1171 }
1172 
1173 void
1174 _rdc_async_throttle(rdc_k_info_t *this, long len)
1175 {
1176         rdc_k_info_t *krdc;
1177         rdc_u_info_t *urdc;
1178         int print_msg = 1;
1179         int tries = RDC_FUTILE_ATTEMPTS;
1180 
1181         /*
1182          * Throttle entries on queue
1183          */
1184 
1185         /* Need to take the 1-many case into account, checking all sets */
1186 
1187         /* ADD HANDY HUERISTIC HERE TO SLOW DOWN IO */
1188         for (krdc = this; /* CSTYLED */; krdc = krdc->many_next) {
1189                 urdc = &rdc_u_info[krdc->index];
1190 
1191                 /*
1192                  * this may be the last set standing in a one to many setup.
1193                  * we may also be stuck in unintercept, after marking
1194                  * the volume as not enabled, but have not removed it
1195                  * from the many list resulting in an endless loop if
1196                  * we just continue here. Lets jump over this stuff
1197                  * and check to see if we are the only dude here.
1198                  */
1199                 if (!IS_ENABLED(urdc))
1200                         goto thischeck;
1201 
1202                 if (IS_ASYNC(urdc) && RDC_IS_MEMQ(krdc->group)) {
1203                         net_queue *q = &krdc->group->ra_queue;
1204                         while ((q->blocks + q->inflbls) > urdc->maxqfbas ||
1205                             (q->nitems + q->inflitems) > urdc->maxqitems) {
1206 
1207                                 if (!IS_ENABLED(urdc)) /* disable race */
1208                                         goto thischeck;
1209 
1210                                 if (!krdc->group->rdc_writer)
1211                                         (void) rdc_writer(krdc->index);
1212                                 delay(2);
1213                                 q->throttle_delay++;
1214                         }
1215                 }
1216 
1217                 /* do a much more aggressive delay, get disk flush going */
1218                 if (IS_ASYNC(urdc) && RDC_IS_DISKQ(krdc->group)) {
1219                         disk_queue *q = &krdc->group->diskq;
1220                         while ((!IS_QSTATE(q, RDC_QNOBLOCK)) &&
1221                             (_rdc_diskq_isfull(q, len)) &&
1222                             (!IS_STATE(urdc, RDC_DISKQ_FAILED))) {
1223                                 if (print_msg) {
1224                                         cmn_err(CE_WARN, "!rdc async throttle:"
1225                                             " disk queue %s full",
1226                                             &urdc->disk_queue[0]);
1227 
1228                                         print_msg = 0;
1229                                 }
1230                                 if (!IS_ENABLED(urdc)) /* disable race */
1231                                         goto thischeck;
1232 
1233                                 if (!krdc->group->rdc_writer)
1234                                         (void) rdc_writer(krdc->index);
1235                                 delay(10);
1236                                 q->throttle_delay += 10;
1237 
1238                                 if (!(tries--) && IS_STATE(urdc, RDC_QUEUING)) {
1239                                         cmn_err(CE_WARN, "!SNDR: disk queue "
1240                                             "%s full & not flushing. giving up",
1241                                             &urdc->disk_queue[0]);
1242                                         cmn_err(CE_WARN, "!SNDR: %s:%s entering"
1243                                             " logging mode",
1244                                             urdc->secondary.intf,
1245                                             urdc->secondary.file);
1246                                         rdc_fail_diskq(krdc, RDC_WAIT,
1247                                             RDC_DOLOG | RDC_NOFAIL);
1248                                         mutex_enter(QLOCK(q));
1249                                         cv_broadcast(&q->qfullcv);
1250                                         mutex_exit(QLOCK(q));
1251                                 }
1252 
1253                         }
1254                         if ((IS_QSTATE(q, RDC_QNOBLOCK)) &&
1255                             _rdc_diskq_isfull(q, len) &&
1256                             !IS_STATE(urdc, RDC_DISKQ_FAILED)) {
1257                                 if (print_msg) {
1258                                         cmn_err(CE_WARN, "!disk queue %s full",
1259                                             &urdc->disk_queue[0]);
1260                                         print_msg = 0;
1261                                 }
1262                                 rdc_fail_diskq(krdc, RDC_WAIT,
1263                                     RDC_DOLOG | RDC_NOFAIL);
1264                                 mutex_enter(QLOCK(q));
1265                                 cv_broadcast(&q->qfullcv);
1266                                 mutex_exit(QLOCK(q));
1267                         }
1268                 }
1269 
1270 thischeck:
1271                 if (krdc->many_next == this)
1272                         break;
1273         }
1274 }
1275 
1276 int rdc_coalesce = 1;
1277 static int rdc_joins = 0;
1278 
1279 int
1280 rdc_aio_coalesce(rdc_aio_t *queued, rdc_aio_t *new)
1281 {
1282         nsc_buf_t *h = NULL;
1283         int rc;
1284         rdc_k_info_t *krdc;
1285         uint_t bitmask;
1286 
1287         if (rdc_coalesce == 0)
1288                 return (0);             /* don't even try */
1289 
1290         if ((queued == NULL) ||
1291             (queued->handle == NULL) ||
1292             (new->handle == NULL)) {
1293                 return (0);             /* existing queue is empty */
1294         }
1295         if (queued->index != new->index || queued->len + new->len >
1296             MAX_RDC_FBAS) {
1297                 return (0);             /* I/O to big */
1298         }
1299         if ((queued->pos + queued->len == new->pos) ||
1300             (new->pos + new->len == queued->pos)) {
1301                 rc = nsc_alloc_abuf(queued->pos, queued->len + new->len, 0,
1302                     &h);
1303                 if (!RDC_SUCCESS(rc)) {
1304                         if (h != NULL)
1305                                 (void) nsc_free_buf(h);
1306                         return (0);             /* couldn't do coalesce */
1307                 }
1308                 rc = nsc_copy(queued->handle, h, queued->pos, queued->pos,
1309                     queued->len);
1310                 if (!RDC_SUCCESS(rc)) {
1311                         (void) nsc_free_buf(h);
1312                         return (0);             /* couldn't do coalesce */
1313                 }
1314                 rc = nsc_copy(new->handle, h, new->pos, new->pos,
1315                     new->len);
1316                 if (!RDC_SUCCESS(rc)) {
1317                         (void) nsc_free_buf(h);
1318                         return (0);             /* couldn't do coalesce */
1319                 }
1320 
1321                 krdc = &rdc_k_info[queued->index];
1322 
1323                 RDC_SET_BITMASK(queued->pos, queued->len, &bitmask);
1324                 RDC_CLR_BITMAP(krdc, queued->pos, queued->len, \
1325                     bitmask, RDC_BIT_BUMP);
1326 
1327                 RDC_SET_BITMASK(new->pos, new->len, &bitmask);
1328                 RDC_CLR_BITMAP(krdc, new->pos, new->len, \
1329                     bitmask, RDC_BIT_BUMP);
1330 
1331                 (void) nsc_free_buf(queued->handle);
1332                 (void) nsc_free_buf(new->handle);
1333                 queued->handle = h;
1334                 queued->len += new->len;
1335                 bitmask = 0;
1336                 /*
1337                  * bump the ref count back up
1338                  */
1339 
1340                 RDC_SET_BITMAP(krdc, queued->pos, queued->len, &bitmask);
1341                 return (1);     /* new I/O succeeds last I/O queued */
1342         }
1343         return (0);
1344 }
1345 
1346 int
1347 rdc_memq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1348 {
1349         net_queue *q;
1350         rdc_group_t *group;
1351 
1352         group = krdc->group;
1353         q = &group->ra_queue;
1354 
1355         mutex_enter(&q->net_qlock);
1356 
1357         if (rdc_aio_coalesce(q->net_qtail, aio)) {
1358                 rdc_joins++;
1359                 q->blocks += aio->len;
1360                 kmem_free(aio, sizeof (*aio));
1361                 goto out;
1362         }
1363         aio->seq = group->seq++;
1364         if (group->seq < aio->seq)
1365                 group->seq = RDC_NEWSEQ + 1; /* skip magics */
1366 
1367         if (q->net_qhead == NULL) {
1368                 /* adding to empty q */
1369                 q->net_qhead = q->net_qtail = aio;
1370 
1371 #ifdef DEBUG
1372                 if (q->blocks != 0 || q->nitems != 0) {
1373                         cmn_err(CE_PANIC,
1374                             "rdc enqueue: q %p, qhead 0, q blocks %" NSC_SZFMT
1375                             ", nitems %" NSC_SZFMT,
1376                             (void *) q, q->blocks, q->nitems);
1377                 }
1378 #endif
1379 
1380         } else {
1381                 /* discontiguous, add aio to q tail */
1382                 q->net_qtail->next = aio;
1383                 q->net_qtail = aio;
1384         }
1385 
1386         q->blocks += aio->len;
1387         q->nitems++;
1388 
1389         if (krdc->io_kstats) {
1390                 mutex_enter(krdc->io_kstats->ks_lock);
1391                 kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1392                 mutex_exit(krdc->io_kstats->ks_lock);
1393         }
1394 out:
1395 #ifdef DEBUG
1396         /* sum the q and check for sanity */
1397         {
1398                 nsc_size_t qblocks = 0;
1399                 uint64_t nitems = 0;
1400                 rdc_aio_t *a;
1401 
1402                 for (a = q->net_qhead; a != NULL; a = a->next) {
1403                         qblocks += a->len;
1404                         nitems++;
1405                 }
1406 
1407                 if (qblocks != q->blocks || nitems != q->nitems) {
1408                         cmn_err(CE_PANIC,
1409                             "rdc enqueue: q %p, q blocks %" NSC_SZFMT " (%"
1410                             NSC_SZFMT "), nitems %" NSC_SZFMT " (%"
1411                             NSC_SZFMT ")", (void *) q, q->blocks, qblocks,
1412                             q->nitems, nitems);
1413                 }
1414         }
1415 #endif
1416 
1417         mutex_exit(&q->net_qlock);
1418 
1419         if (q->nitems > q->nitems_hwm) {
1420                 q->nitems_hwm = q->nitems;
1421         }
1422 
1423         if (q->blocks > q->blocks_hwm) {
1424                 q->blocks_hwm = q->blocks;
1425         }
1426 
1427         if (!krdc->group->rdc_writer)
1428                 (void) rdc_writer(krdc->index);
1429 
1430         return (0);
1431 }
1432 
1433 int
1434 _rdc_enqueue_write(rdc_k_info_t *krdc, nsc_off_t pos, nsc_size_t len, int flag,
1435     nsc_buf_t *h)
1436 {
1437         rdc_aio_t *aio;
1438         rdc_group_t *group;
1439         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1440         int rc;
1441 
1442         aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1443         if (!aio) {
1444                 return (ENOMEM);
1445         }
1446 
1447         group = krdc->group;
1448 
1449         aio->pos = pos;
1450         aio->qpos = -1;
1451         aio->len = len;
1452         aio->flag = flag;
1453         aio->index = krdc->index;
1454         aio->handle = h;
1455 
1456         if (group->flags & RDC_MEMQUE) {
1457                 return (rdc_memq_enqueue(krdc, aio));
1458         } else if ((group->flags & RDC_DISKQUE) &&
1459             !IS_STATE(urdc, RDC_DISKQ_FAILED)) {
1460                 rc = rdc_diskq_enqueue(krdc, aio);
1461                 kmem_free(aio, sizeof (*aio));
1462                 return (rc);
1463         }
1464         return (-1); /* keep lint quiet */
1465 }
1466 
1467 
1468 
1469 
1470 /*
1471  * Async Network RDC flusher
1472  */
1473 
1474 /*
1475  * don't allow any new writer threads to start if a member of the set
1476  * is disable pending
1477  */
1478 int
1479 is_disable_pending(rdc_k_info_t *krdc)
1480 {
1481         rdc_k_info_t *this = krdc;
1482         int rc = 0;
1483 
1484         do {
1485                 if (krdc->type_flag & RDC_DISABLEPEND) {
1486                         krdc = this;
1487                         rc = 1;
1488                         break;
1489                 }
1490                 krdc = krdc->group_next;
1491 
1492         } while (krdc != this);
1493 
1494         return (rc);
1495 }
1496 
1497 /*
1498  * rdc_writer -- spawn new writer if not running already
1499  *      called after enqueing the dirty blocks
1500  */
1501 int
1502 rdc_writer(int index)
1503 {
1504         rdc_k_info_t *krdc = &rdc_k_info[index];
1505         nsthread_t *t;
1506         rdc_group_t     *group;
1507         kmutex_t        *qlock;
1508         int tries;
1509         const int MAX_TRIES = 16;
1510 
1511         group = krdc->group;
1512 
1513         if (RDC_IS_DISKQ(group))
1514                 qlock = &group->diskq.disk_qlock;
1515         else
1516                 qlock = &group->ra_queue.net_qlock;
1517 
1518         mutex_enter(qlock);
1519 
1520 #ifdef DEBUG
1521         if (noflush) {
1522                 mutex_exit(qlock);
1523                 return (0);
1524         }
1525 #endif
1526 
1527         if ((group->rdc_writer) || is_disable_pending(krdc)) {
1528                 mutex_exit(qlock);
1529                 return (0);
1530         }
1531 
1532         if ((group->rdc_thrnum >= 1) && (group->seqack == RDC_NEWSEQ)) {
1533                 /*
1534                  * We also need to check if we are starting a new
1535                  * sequence, and if so don't create a new thread,
1536                  * as we must ensure that the start of new sequence
1537                  * requests arrives first to re-init the server.
1538                  */
1539                 mutex_exit(qlock);
1540                 return (0);
1541         }
1542         /*
1543          * For version 6,
1544          * see if we can fit in another thread.
1545          */
1546         group->rdc_thrnum++;
1547 
1548         if (krdc->intf && (krdc->intf->rpc_version >= RDC_VERSION6)) {
1549                 rdc_u_info_t *urdc = &rdc_u_info[index];
1550                 if (group->rdc_thrnum >= urdc->asyncthr)
1551                         group->rdc_writer = 1;
1552         } else {
1553                 group->rdc_writer = 1;
1554         }
1555 
1556         mutex_exit(qlock);
1557 
1558 
1559         /*
1560          * If we got here, we know that we have not exceeded the allowed
1561          * number of async threads for our group.  If we run out of threads
1562          * in _rdc_flset, we add a new thread to the set.
1563          */
1564         tries = 0;
1565         do {
1566                 /* first try to grab a thread from the free list */
1567                 if (t = nst_create(_rdc_flset, rdc_flusher_thread,
1568                     (blind_t)(unsigned long)index, 0)) {
1569                         break;
1570                 }
1571 
1572                 /* that failed; add a thread to the set and try again */
1573                 if (nst_add_thread(_rdc_flset, 1) != 1) {
1574                         cmn_err(CE_WARN, "!rdc_writer index %d nst_add_thread "
1575                             "error, tries: %d", index, tries);
1576                         break;
1577                 }
1578         } while (++tries < MAX_TRIES);
1579 
1580         if (tries) {
1581                 mutex_enter(&group->addthrnumlk);
1582                 group->rdc_addthrnum += tries;
1583                 mutex_exit(&group->addthrnumlk);
1584         }
1585 
1586         if (t) {
1587                 return (1);
1588         }
1589 
1590         cmn_err(CE_WARN, "!rdc_writer: index %d nst_create error", index);
1591         rdc_many_enter(krdc);
1592         mutex_enter(qlock);
1593         group->rdc_thrnum--;
1594         group->rdc_writer = 0;
1595         if ((group->count == 0) && (group->rdc_thrnum == 0)) {
1596                 mutex_exit(qlock);
1597                 /*
1598                  * Race with remove_from_group while write thread was
1599                  * failing to be created.
1600                  */
1601 #ifdef DEBUG
1602                 cmn_err(CE_WARN, "!rdc_writer: group being destroyed");
1603 #endif
1604                 rdc_delgroup(group);
1605                 krdc->group = NULL;
1606                 rdc_many_exit(krdc);
1607                 return (-1);
1608         }
1609         mutex_exit(qlock);
1610         rdc_many_exit(krdc);
1611         return (-1);
1612 }
1613 
1614 /*
1615  * Either we need to flush the
1616  * kmem (net_queue) queue or the disk (disk_queue)
1617  * determine which, and do it.
1618  */
1619 void
1620 rdc_flusher_thread(int index)
1621 {
1622         rdc_k_info_t *krdc = &rdc_k_info[index];
1623 
1624         if (krdc->group->flags & RDC_MEMQUE) {
1625                 rdc_flush_memq(index);
1626                 return;
1627         } else if (krdc->group->flags & RDC_DISKQUE) {
1628                 rdc_flush_diskq(index);
1629                 return;
1630         } else { /* uh-oh, big time */
1631                 cmn_err(CE_PANIC, "flusher trying to flush unknown queue type");
1632         }
1633 
1634 }
1635 
1636 void
1637 rdc_flush_memq(int index)
1638 {
1639         rdc_k_info_t *krdc = &rdc_k_info[index];
1640         rdc_aio_t *aio;
1641         net_queue *q;
1642         int dowork;
1643         rdc_group_t *group = krdc->group;
1644         if (!group || group->count == 0) {
1645 #ifdef DEBUG
1646                 cmn_err(CE_WARN, "!rdc_flush_memq: no group left!");
1647 #endif
1648                 return;
1649         }
1650 
1651         if (!krdc->c_fd) {
1652 #ifdef DEBUG
1653                 cmn_err(CE_WARN, "!rdc_flush_memq: no c_fd!");
1654 #endif
1655                 goto thread_death;
1656         }
1657 
1658 #ifdef DEBUG_DISABLE
1659         if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1660                 cmn_err(CE_WARN, "!rdc_flush_memq: DISABLE PENDING!");
1661                 /*
1662                  * Need to continue as we may be trying to flush IO
1663                  * while trying to disable or suspend
1664                  */
1665         }
1666 #endif
1667 
1668         q = &group->ra_queue;
1669 
1670         dowork = 1;
1671         /* CONSTCOND */
1672         while (dowork) {
1673                 if (net_exit == ATM_EXIT)
1674                         break;
1675 
1676                 group = krdc->group;
1677                 if (!group || group->count == 0) {
1678 #ifdef DEBUG
1679                         cmn_err(CE_WARN, "!rdc_flush_memq: no group left!");
1680 #endif
1681                         break;
1682                 }
1683 
1684                 mutex_enter(&q->net_qlock);
1685                 aio = q->net_qhead;
1686 
1687                 if (aio == NULL) {
1688 #ifdef DEBUG
1689                         if (q->nitems != 0 ||
1690                             q->blocks != 0 ||
1691                             q->net_qtail != 0) {
1692                                 cmn_err(CE_PANIC,
1693                                     "rdc_flush_memq(1): q %p, q blocks %"
1694                                     NSC_SZFMT ", nitems %" NSC_SZFMT
1695                                     ", qhead %p qtail %p",
1696                                     (void *) q, q->blocks, q->nitems,
1697                                     (void *) aio, (void *) q->net_qtail);
1698                         }
1699 #endif
1700                         mutex_exit(&q->net_qlock);
1701                         break;
1702                 }
1703 
1704                 /* aio remove from q */
1705 
1706                 q->net_qhead = aio->next;
1707                 aio->next = NULL;
1708 
1709                 if (q->net_qtail == aio)
1710                         q->net_qtail = q->net_qhead;
1711 
1712                 q->blocks -= aio->len;
1713                 q->nitems--;
1714 
1715                 /*
1716                  * in flight numbers.
1717                  */
1718                 q->inflbls += aio->len;
1719                 q->inflitems++;
1720 
1721 #ifdef DEBUG
1722                 if (q->net_qhead == NULL) {
1723                         if (q->nitems != 0 ||
1724                             q->blocks != 0 ||
1725                             q->net_qtail != 0) {
1726                                 cmn_err(CE_PANIC,
1727                                     "rdc_flush_memq(2): q %p, q blocks %"
1728                                     NSC_SZFMT ", nitems %" NSC_SZFMT
1729                                     ", qhead %p qtail %p",
1730                                     (void *) q, q->blocks, q->nitems,
1731                                     (void *) q->net_qhead,
1732                                     (void *) q->net_qtail);
1733                         }
1734                 }
1735 
1736 #ifndef NSC_MULTI_TERABYTE
1737                 if (q->blocks < 0) {
1738                         cmn_err(CE_PANIC,
1739                             "rdc_flush_memq(3): q %p, q blocks %" NSC_SZFMT
1740                             ", nitems %d, qhead %p, qtail %p",
1741                             (void *) q, q->blocks, q->nitems,
1742                             (void *) q->net_qhead, (void *) q->net_qtail);
1743                 }
1744 #else
1745                 /* blocks and nitems are unsigned for NSC_MULTI_TERABYTE */
1746 #endif
1747 #endif
1748 
1749                 mutex_exit(&q->net_qlock);
1750 
1751                 aio->iostatus = RDC_IO_INIT;
1752 
1753                 _rdc_remote_flush(aio);
1754 
1755                 mutex_enter(&q->net_qlock);
1756                 q->inflbls -= aio->len;
1757                 q->inflitems--;
1758                 if ((group->seqack == RDC_NEWSEQ) &&
1759                     (group->seq != RDC_NEWSEQ + 1)) {
1760                         if ((q->net_qhead == NULL) ||
1761                             (q->net_qhead->seq != RDC_NEWSEQ + 1)) {
1762                                 /*
1763                                  * We are an old thread, and the
1764                                  * queue sequence has been reset
1765                                  * during the network write above.
1766                                  * As such we mustn't pull another
1767                                  * job from the queue until the
1768                                  * first sequence message has been ack'ed.
1769                                  * Just die instead. Unless this thread
1770                                  * is the first sequence that has just
1771                                  * been ack'ed
1772                                  */
1773                                 dowork = 0;
1774                         }
1775                 }
1776                 mutex_exit(&q->net_qlock);
1777 
1778                 if ((aio->iostatus != RDC_IO_DONE) && (group->count)) {
1779                         rdc_k_info_t *krdctmp = &rdc_k_info[aio->index];
1780                         if (krdctmp->type_flag & RDC_DISABLEPEND) {
1781                                 kmem_free(aio, sizeof (*aio));
1782                                 goto thread_death;
1783                         }
1784                         rdc_group_enter(krdc);
1785                         ASSERT(krdc->group);
1786                         rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
1787                             "memq flush aio status not RDC_IO_DONE");
1788                         rdc_group_exit(krdc);
1789                         rdc_dump_queue(aio->index);
1790                 }
1791                 kmem_free(aio, sizeof (*aio));
1792 
1793                 if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf)
1794                         break;
1795         }
1796 
1797 thread_death:
1798         rdc_many_enter(krdc);
1799         mutex_enter(&group->ra_queue.net_qlock);
1800         group->rdc_thrnum--;
1801         group->rdc_writer = 0;
1802         /*
1803          * all threads must be dead.
1804          */
1805         if ((group->count == 0) && (group->rdc_thrnum == 0)) {
1806                 mutex_exit(&group->ra_queue.net_qlock);
1807                 /*
1808                  * Group now empty, so destroy
1809                  * Race with remove_from_group while write thread was running
1810                  */
1811 #ifdef DEBUG
1812                 cmn_err(CE_WARN, "!rdc_flush_memq: group being destroyed");
1813 #endif
1814                 rdc_delgroup(group);
1815                 krdc->group = NULL;
1816                 rdc_many_exit(krdc);
1817                 return;
1818         }
1819         mutex_exit(&group->ra_queue.net_qlock);
1820         rdc_many_exit(krdc);
1821 }
1822 
1823 /*
1824  * rdc_flush_diskq
1825  * disk queue flusher
1826  */
1827 void
1828 rdc_flush_diskq(int index)
1829 {
1830         rdc_k_info_t *krdc = &rdc_k_info[index];
1831         rdc_u_info_t *urdc = &rdc_u_info[index];
1832         rdc_aio_t *aio = NULL;
1833         disk_queue *q;
1834         net_queue *nq;
1835         int dowork;
1836         int rc;
1837         rdc_group_t *group = krdc->group;
1838 
1839         if (!group || group->count == 0) {
1840 #ifdef DEBUG
1841                 cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!");
1842 #endif
1843                 return;
1844         }
1845 
1846         if (!krdc->c_fd) {
1847 #ifdef DEBUG
1848                 cmn_err(CE_WARN, "!rdc_flush_diskq: no c_fd!");
1849 #endif
1850                 return;
1851         }
1852 
1853 #ifdef DEBUG_DISABLE
1854         if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
1855                 cmn_err(CE_WARN, "!rdc_flush_diskq: DISABLE PENDING!");
1856                 /*
1857                  * Need to continue as we may be trying to flush IO
1858                  * while trying to disable or suspend
1859                  */
1860         }
1861 #endif
1862         q = &group->diskq;
1863         nq = &group->ra_queue;
1864 
1865         if (IS_QSTATE(q, RDC_QDISABLEPEND) || IS_STATE(urdc, RDC_LOGGING)) {
1866 #ifdef DEBUG
1867                 cmn_err(CE_NOTE, "!flusher thread death 1 %x", QSTATE(q));
1868 #endif
1869                 goto thread_death;
1870         }
1871 
1872         dowork = 1;
1873         /* CONSTCOND */
1874         while (dowork) {
1875                 if (net_exit == ATM_EXIT)
1876                         break;
1877 
1878                 group = krdc->group;
1879                 if (!group || group->count == 0) {
1880 #ifdef DEBUG
1881                         cmn_err(CE_WARN, "!rdc_flush_diskq: no group left!");
1882 #endif
1883                         break;
1884                 }
1885 
1886                 do {
1887                         rc = 0;
1888                         if ((IS_STATE(urdc, RDC_LOGGING)) ||
1889                             (IS_STATE(urdc, RDC_SYNCING)) ||
1890                             (nq->qfflags & RDC_QFILLSLEEP))
1891                                 goto thread_death;
1892 
1893                         aio = rdc_dequeue(krdc, &rc);
1894 
1895                         if ((IS_STATE(urdc, RDC_LOGGING)) ||
1896                             (IS_STATE(urdc, RDC_SYNCING)) ||
1897                             (nq->qfflags & RDC_QFILLSLEEP)) {
1898                                 goto thread_death;
1899                         }
1900                         if (rc == EAGAIN) {
1901                                 delay(40);
1902                         }
1903 
1904                 } while (rc == EAGAIN);
1905 
1906                 if (aio == NULL) {
1907                         break;
1908                 }
1909 
1910                 aio->iostatus = RDC_IO_INIT;
1911 
1912                 mutex_enter(QLOCK(q));
1913                 q->inflbls += aio->len;
1914                 q->inflitems++;
1915                 mutex_exit(QLOCK(q));
1916 
1917                 _rdc_remote_flush(aio);
1918 
1919                 mutex_enter(QLOCK(q));
1920                 q->inflbls -= aio->len;
1921                 q->inflitems--;
1922 
1923                 if ((group->seqack == RDC_NEWSEQ) &&
1924                     (group->seq != RDC_NEWSEQ + 1)) {
1925                         if ((nq->net_qhead == NULL) ||
1926                             (nq->net_qhead->seq != RDC_NEWSEQ + 1)) {
1927                                 /*
1928                                  * We are an old thread, and the
1929                                  * queue sequence has been reset
1930                                  * during the network write above.
1931                                  * As such we mustn't pull another
1932                                  * job from the queue until the
1933                                  * first sequence message has been ack'ed.
1934                                  * Just die instead. Unless of course,
1935                                  * this thread is the first sequence that
1936                                  * has just been ack'ed.
1937                                  */
1938                                 dowork = 0;
1939                         }
1940                 }
1941                 mutex_exit(QLOCK(q));
1942 
1943                 if (aio->iostatus == RDC_IO_CANCELLED) {
1944                         rdc_dump_queue(aio->index);
1945                         kmem_free(aio, sizeof (*aio));
1946                         aio = NULL;
1947                         if (group) { /* seq gets bumped on dequeue */
1948                                 mutex_enter(QLOCK(q));
1949                                 rdc_dump_iohdrs(q);
1950                                 SET_QNXTIO(q, QHEAD(q));
1951                                 SET_QCOALBOUNDS(q, QHEAD(q));
1952                                 group->seq = RDC_NEWSEQ;
1953                                 group->seqack = RDC_NEWSEQ;
1954                                 mutex_exit(QLOCK(q));
1955                         }
1956                         break;
1957                 }
1958 
1959                 if ((aio->iostatus != RDC_IO_DONE) && (group->count)) {
1960                         rdc_k_info_t *krdctmp = &rdc_k_info[aio->index];
1961                         if (krdctmp->type_flag & RDC_DISABLEPEND) {
1962                                 kmem_free(aio, sizeof (*aio));
1963                                 aio = NULL;
1964                                 goto thread_death;
1965                         }
1966                         rdc_group_enter(krdc);
1967                         rdc_group_log(krdc,
1968                             RDC_NOFLUSH | RDC_ALLREMOTE | RDC_QUEUING,
1969                             "diskq flush aio status not RDC_IO_DONE");
1970                         rdc_group_exit(krdc);
1971                         rdc_dump_queue(aio->index);
1972                 }
1973 
1974                 kmem_free(aio, sizeof (*aio));
1975                 aio = NULL;
1976 
1977 #ifdef DEBUG_DISABLE
1978                 if (krdc->type_flag & RDC_DISABLEPEND) {
1979                         cmn_err(CE_WARN,
1980                             "!rdc_flush_diskq: DISABLE PENDING after IO!");
1981                 }
1982 #endif
1983                 if (krdc->remote_index < 0 || !krdc->lsrv || !krdc->intf)
1984                         break;
1985 
1986                 if (IS_QSTATE(q, RDC_QDISABLEPEND)) {
1987 #ifdef DEBUG
1988                         cmn_err(CE_NOTE, "!flusher thread death 2");
1989 #endif
1990                         break;
1991                 }
1992         }
1993 thread_death:
1994         rdc_many_enter(krdc);
1995         mutex_enter(QLOCK(q));
1996         group->rdc_thrnum--;
1997         group->rdc_writer = 0;
1998 
1999         if (aio && aio->qhandle) {
2000                 aio->qhandle->sb_user--;
2001                 if (aio->qhandle->sb_user == 0) {
2002                         (void) _rdc_rsrv_diskq(krdc->group);
2003                         rdc_fixlen(aio);
2004                         (void) nsc_free_buf(aio->qhandle);
2005                         aio->qhandle = NULL;
2006                         aio->handle = NULL;
2007                         _rdc_rlse_diskq(krdc->group);
2008                 }
2009         }
2010         if ((group->count == 0) && (group->rdc_thrnum == 0)) {
2011                 mutex_exit(QLOCK(q));
2012                 /*
2013                  * Group now empty, so destroy
2014                  * Race with remove_from_group while write thread was running
2015                  */
2016 #ifdef DEBUG
2017                 cmn_err(CE_WARN, "!rdc_flush_diskq: group being destroyed");
2018 #endif
2019                 mutex_enter(&group->diskqmutex);
2020                 rdc_close_diskq(group);
2021                 mutex_exit(&group->diskqmutex);
2022                 rdc_delgroup(group);
2023                 krdc->group = NULL;
2024                 rdc_many_exit(krdc);
2025                 return;
2026         }
2027         mutex_exit(QLOCK(q));
2028         rdc_many_exit(krdc);
2029 }
2030 
2031 /*
2032  * _rdc_remote_flush
2033  * Flush a single block ANON block
2034  * this function will flush from either the disk queue
2035  * or the memory queue. The appropriate locks must be
2036  * taken out etc, etc ...
2037  */
2038 static void
2039 _rdc_remote_flush(rdc_aio_t *aio)
2040 {
2041         rdc_k_info_t *krdc = &rdc_k_info[aio->index];
2042         rdc_u_info_t *urdc = &rdc_u_info[aio->index];
2043         disk_queue *q = &krdc->group->diskq;
2044         kmutex_t *qlock;
2045         rdc_group_t *group;
2046         nsc_buf_t *h = NULL;
2047         int reserved = 0;
2048         int rtype = RDC_RAW;
2049         int rc;
2050         uint_t maxseq;
2051         struct netwriteres netret;
2052         int waitq = 1;
2053         int vflags;
2054 
2055         group = krdc->group;
2056         netret.vecdata.vecdata_val = NULL;
2057         netret.vecdata.vecdata_len = 0;
2058 
2059         /* Where did we get this aio from anyway? */
2060         if (RDC_IS_DISKQ(group)) {
2061                 qlock = &group->diskq.disk_qlock;
2062         } else {
2063                 qlock = &group->ra_queue.net_qlock;
2064         }
2065 
2066         /*
2067          * quench transmission if we are too far ahead of the
2068          * server Q, or it will overflow.
2069          * Must fail all requests while asyncdis is set.
2070          * It will be cleared when the last thread to be discarded
2071          * sets the asyncstall counter to zero.
2072          * Note the thread within rdc_net_write
2073          * also bumps the asyncstall counter.
2074          */
2075 
2076         mutex_enter(qlock);
2077         if (group->asyncdis) {
2078                 aio->iostatus = RDC_IO_CANCELLED;
2079                 mutex_exit(qlock);
2080                 goto failed;
2081         }
2082         /* don't go to sleep if we have gone logging! */
2083         vflags = rdc_get_vflags(urdc);
2084         if ((vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2085                 if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group))
2086                         aio->iostatus = RDC_IO_CANCELLED;
2087 
2088                 mutex_exit(qlock);
2089                 goto failed;
2090         }
2091 
2092         while (maxseq = group->seqack + RDC_MAXPENDQ + 1,
2093             maxseq = (maxseq < group->seqack) ? maxseq + RDC_NEWSEQ + 1
2094             : maxseq, !RDC_INFRONT(aio->seq, maxseq)) {
2095                 group->asyncstall++;
2096                 ASSERT(!IS_STATE(urdc, RDC_LOGGING));
2097                 cv_wait(&group->asyncqcv, qlock);
2098                 group->asyncstall--;
2099                 ASSERT(group->asyncstall >= 0);
2100                 if (group->asyncdis) {
2101                         if (group->asyncstall == 0) {
2102                                 group->asyncdis = 0;
2103                         }
2104                         aio->iostatus = RDC_IO_CANCELLED;
2105                         mutex_exit(qlock);
2106                         goto failed;
2107                 }
2108                 /*
2109                  * See if we have gone into logging mode
2110                  * since sleeping.
2111                  */
2112                 vflags = rdc_get_vflags(urdc);
2113                 if (vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING)) {
2114                         if ((vflags & RDC_LOGGING) && RDC_IS_DISKQ(group))
2115                                 aio->iostatus = RDC_IO_CANCELLED;
2116 
2117                         mutex_exit(qlock);
2118                         goto failed;
2119                 }
2120         }
2121         mutex_exit(qlock);
2122 
2123         if ((krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) {
2124                 mutex_enter(krdc->io_kstats->ks_lock);
2125                 kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2126                 mutex_exit(krdc->io_kstats->ks_lock);
2127                 waitq = 0;
2128         }
2129 
2130 
2131         rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
2132         if (rc != 0) {
2133 #ifdef DEBUG
2134                 cmn_err(CE_WARN, "!_rdc_remote_flush: reserve, index %d, rc %d",
2135                     aio->index, rc);
2136 #endif
2137                 goto failed;
2138         }
2139 
2140         reserved = 1;
2141         /*
2142          * Case where we are multihop and calling with no ANON bufs
2143          * Need to do the read to fill the buf.
2144          */
2145         if (!aio->handle) {
2146                 rc = nsc_alloc_buf(RDC_U_FD(krdc), aio->pos, aio->len,
2147                     (aio->flag & ~NSC_WRITE) | NSC_READ, &h);
2148                 if (!RDC_SUCCESS(rc)) {
2149 #ifdef DEBUG
2150                         cmn_err(CE_WARN,
2151                             "!_rdc_remote_flush: alloc_buf, index %d, pos %"
2152                             NSC_SZFMT ", len %" NSC_SZFMT ", rc %d",
2153                             aio->index, aio->pos, aio->len, rc);
2154 #endif
2155 
2156                         goto failed;
2157                 }
2158                 aio->handle = h;
2159                 aio->handle->sb_user = RDC_NULLBUFREAD;
2160         }
2161 
2162         mutex_enter(qlock);
2163         if (group->asyncdis) {
2164                 if (group->asyncstall == 0) {
2165                         group->asyncdis = 0;
2166                 }
2167                 aio->iostatus = RDC_IO_CANCELLED;
2168                 mutex_exit(qlock);
2169                 goto failed;
2170         }
2171         group->asyncstall++;
2172         mutex_exit(qlock);
2173 
2174 
2175         if (krdc->remote_index < 0) {
2176                 /*
2177                  * this should be ok, we are flushing, not rev syncing.
2178                  * remote_index could be -1 if we lost a race with
2179                  * resume and the flusher trys to flush an io from
2180                  * another set that has not resumed
2181                  */
2182                 krdc->remote_index = rdc_net_state(krdc->index, CCIO_SLAVE);
2183                 DTRACE_PROBE1(remote_index_negative, int, krdc->remote_index);
2184 
2185         }
2186 
2187         /*
2188          * double check for logging, no check in net_write()
2189          * skip the write if you can, otherwise, if logging
2190          * avoid clearing the bit .. you don't know whose bit it may
2191          * also be.
2192          */
2193         if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2194                 aio->iostatus = RDC_IO_CANCELLED;
2195                 mutex_enter(qlock);
2196                 group->asyncstall--;
2197                 mutex_exit(qlock);
2198                 goto failed;
2199         }
2200 
2201         rc = rdc_net_write(krdc->index, krdc->remote_index,
2202             aio->handle, aio->pos, aio->len, aio->seq, aio->qpos, &netret);
2203 
2204         mutex_enter(qlock);
2205         group->asyncstall--;
2206         if (group->asyncdis) {
2207                 if (group->asyncstall == 0) {
2208                         group->asyncdis = 0;
2209                 }
2210                 aio->iostatus = RDC_IO_CANCELLED;
2211                 mutex_exit(qlock);
2212                 goto failed;
2213         }
2214 
2215         if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2216                 mutex_exit(qlock);
2217                 aio->iostatus = RDC_IO_CANCELLED;
2218                 goto failed;
2219         }
2220 
2221         ASSERT(aio->handle);
2222         if (rc != 0) {
2223 #ifdef DEBUG
2224                 cmn_err(CE_WARN,
2225                     "!_rdc_remote_flush: write, index %d, pos %" NSC_SZFMT
2226                     ", len %" NSC_SZFMT ", "
2227                     "rc %d seq %u group seq %u seqack %u qpos %" NSC_SZFMT,
2228                     aio->index, aio->pos, aio->len, rc, aio->seq,
2229                     group->seq, group->seqack, aio->qpos);
2230 #endif
2231                 if (rc == ENOLINK) {
2232                         cmn_err(CE_WARN,
2233                             "!Hard timeout detected (%d sec) "
2234                             "on SNDR set %s:%s",
2235                             rdc_rpc_tmout, urdc->secondary.intf,
2236                             urdc->secondary.file);
2237                 }
2238                 mutex_exit(qlock);
2239                 goto failed;
2240         } else {
2241                 aio->iostatus = RDC_IO_DONE;
2242         }
2243 
2244         if (RDC_IS_DISKQ(group)) {
2245                 /* free locally alloc'd handle */
2246                 if (aio->handle->sb_user == RDC_NULLBUFREAD) {
2247                         (void) nsc_free_buf(aio->handle);
2248                         aio->handle = NULL;
2249                 }
2250                 aio->qhandle->sb_user--;
2251                 if (aio->qhandle->sb_user == 0) {
2252                         (void) _rdc_rsrv_diskq(group);
2253                         rdc_fixlen(aio);
2254                         (void) nsc_free_buf(aio->qhandle);
2255                         aio->qhandle = NULL;
2256                         aio->handle = NULL;
2257                         _rdc_rlse_diskq(group);
2258                 }
2259 
2260         } else {
2261                 (void) nsc_free_buf(aio->handle);
2262                 aio->handle = NULL;
2263         }
2264 
2265         mutex_exit(qlock);
2266 
2267         _rdc_rlse_devs(krdc, rtype);
2268 
2269         if (netret.result == 0) {
2270                 vflags = rdc_get_vflags(urdc);
2271 
2272                 if (!(vflags & (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2273                         RDC_CLR_BITMAP(krdc, aio->pos, aio->len, \
2274                             0xffffffff, RDC_BIT_BUMP);
2275 
2276                         if (RDC_IS_DISKQ(krdc->group)) {
2277                                 if (!IS_STATE(urdc, RDC_LOGGING)) {
2278                                         /* tell queue data has been flushed */
2279                                         rdc_clr_iohdr(krdc, aio->qpos);
2280                                 } else { /* throw away queue, logging */
2281                                         mutex_enter(qlock);
2282                                         rdc_dump_iohdrs(q);
2283                                         SET_QNXTIO(q, QHEAD(q));
2284                                         SET_QCOALBOUNDS(q, QHEAD(q));
2285                                         mutex_exit(qlock);
2286                                 }
2287                         }
2288                 }
2289 
2290                 mutex_enter(qlock);
2291                 /*
2292                  * Check to see if the reply has arrived out of
2293                  * order, if so don't update seqack.
2294                  */
2295                 if (!RDC_INFRONT(aio->seq, group->seqack)) {
2296                         group->seqack = aio->seq;
2297                 }
2298 #ifdef DEBUG
2299                 else {
2300                         rdc_ooreply++;
2301                 }
2302 #endif
2303                 if (group->asyncstall) {
2304                         cv_broadcast(&group->asyncqcv);
2305                 }
2306                 mutex_exit(qlock);
2307         } else if (netret.result < 0) {
2308                 aio->iostatus = RDC_IO_FAILED;
2309         }
2310 
2311         /*
2312          * see if we have any pending async requests we can mark
2313          * as done.
2314          */
2315 
2316         if (netret.vecdata.vecdata_len) {
2317                 net_pendvec_t *vecp;
2318                 net_pendvec_t *vecpe;
2319                 vecp = netret.vecdata.vecdata_val;
2320                 vecpe = netret.vecdata.vecdata_val + netret.vecdata.vecdata_len;
2321                 while (vecp < vecpe) {
2322                         rdc_k_info_t *krdcp = &rdc_k_info[vecp->pindex];
2323                         rdc_u_info_t *urdcp = &rdc_u_info[vecp->pindex];
2324                         /*
2325                          * we must always still be in the same group.
2326                          */
2327                         ASSERT(krdcp->group == group);
2328                         vflags = rdc_get_vflags(urdcp);
2329 
2330                         if (!(vflags &
2331                             (RDC_BMP_FAILED|RDC_VOL_FAILED|RDC_LOGGING))) {
2332                                 RDC_CLR_BITMAP(krdcp, vecp->apos, vecp->alen, \
2333                                     0xffffffff, RDC_BIT_BUMP);
2334                                 if (RDC_IS_DISKQ(krdcp->group)) {
2335                                         if (!IS_STATE(urdc, RDC_LOGGING)) {
2336                                                 /* update queue info */
2337                                                 rdc_clr_iohdr(krdc, vecp->qpos);
2338                                         } else { /* we've gone logging */
2339                                                 mutex_enter(qlock);
2340                                                 rdc_dump_iohdrs(q);
2341                                                 SET_QNXTIO(q, QHEAD(q));
2342                                                 SET_QCOALBOUNDS(q, QHEAD(q));
2343                                                 mutex_exit(qlock);
2344                                         }
2345                                 }
2346                         }
2347 
2348                         /*
2349                          * see if we can re-start transmission
2350                          */
2351                         mutex_enter(qlock);
2352                         if (!RDC_INFRONT(vecp->seq, group->seqack)) {
2353                                 group->seqack = vecp->seq;
2354                         }
2355 #ifdef DEBUG
2356                         else {
2357                                 rdc_ooreply++;
2358                         }
2359 #endif
2360                         DTRACE_PROBE1(pendvec_return, int, vecp->seq);
2361 
2362                         if (group->asyncstall) {
2363                                 cv_broadcast(&group->asyncqcv);
2364                         }
2365                         mutex_exit(qlock);
2366                         vecp++;
2367                 }
2368         }
2369         if (netret.vecdata.vecdata_val)
2370                 kmem_free(netret.vecdata.vecdata_val,
2371                     netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
2372         return;
2373 failed:
2374 
2375         /* perhaps we have a few threads stuck .. */
2376         if (group->asyncstall) {
2377                 group->asyncdis = 1;
2378                 cv_broadcast(&group->asyncqcv);
2379         }
2380         if (netret.vecdata.vecdata_val)
2381                 kmem_free(netret.vecdata.vecdata_val,
2382                     netret.vecdata.vecdata_len * sizeof (net_pendvec_t));
2383 
2384         mutex_enter(qlock);
2385         if (RDC_IS_DISKQ(group)) {
2386                 /* free locally alloc'd hanlde */
2387                 if ((aio->handle) &&
2388                     (aio->handle->sb_user == RDC_NULLBUFREAD)) {
2389                         (void) nsc_free_buf(aio->handle);
2390                         aio->handle = NULL;
2391                 }
2392                 aio->qhandle->sb_user--;
2393                 if (aio->qhandle->sb_user == 0) {
2394                         (void) _rdc_rsrv_diskq(group);
2395                         rdc_fixlen(aio);
2396                         (void) nsc_free_buf(aio->qhandle);
2397                         aio->qhandle = NULL;
2398                         aio->handle = NULL;
2399                         _rdc_rlse_diskq(group);
2400                 }
2401         } else {
2402                 if (aio->handle) {
2403                         (void) nsc_free_buf(aio->handle);
2404                         aio->handle = NULL;
2405                 }
2406         }
2407         mutex_exit(qlock);
2408 
2409         if (reserved) {
2410                 _rdc_rlse_devs(krdc, rtype);
2411         }
2412 
2413         if ((waitq && krdc->io_kstats) && (!RDC_IS_DISKQ(krdc->group))) {
2414                 mutex_enter(krdc->io_kstats->ks_lock);
2415                 kstat_waitq_exit(KSTAT_IO_PTR(krdc->io_kstats));
2416                 mutex_exit(krdc->io_kstats->ks_lock);
2417         }
2418 
2419         /* make sure that the bit is still set */
2420         RDC_CHECK_BIT(krdc, aio->pos, aio->len);
2421 
2422         if (aio->iostatus != RDC_IO_CANCELLED)
2423                 aio->iostatus = RDC_IO_FAILED;
2424 }
2425 
2426 
2427 /*
2428  * rdc_drain_disk_queue
2429  * drain the async network queue for the whole group. Bail out if nothing
2430  * happens in 20 sec
2431  * returns -1 if it bails before the queues are drained.
2432  */
2433 #define NUM_RETRIES     15      /* Number of retries to wait if no progress */
2434 int
2435 rdc_drain_disk_queue(int index)
2436 {
2437         rdc_k_info_t *krdc = &rdc_k_info[index];
2438         volatile rdc_group_t *group;
2439         volatile disk_queue *diskq;
2440         int threads, counter;
2441         long blocks;
2442 
2443         /* Sanity checking */
2444         if (index > rdc_max_sets)
2445                 return (0);
2446 
2447         /*
2448          * If there is no group or diskq configured, we can leave now
2449          */
2450         if (!(group = krdc->group) || !(diskq = &group->diskq))
2451                 return (0);
2452 
2453         /*
2454          * No need to wait if EMPTY and threads are gone
2455          */
2456         counter = 0;
2457         while (!QEMPTY(diskq) || group->rdc_thrnum) {
2458 
2459                 /*
2460                  * Capture counters to determine if progress is being made
2461                  */
2462                 blocks = QBLOCKS(diskq);
2463                 threads = group->rdc_thrnum;
2464 
2465                 /*
2466                  * Wait
2467                  */
2468                 delay(HZ);
2469 
2470                 /*
2471                  * Has the group or disk queue gone away while delayed?
2472                  */
2473                 if (!(group = krdc->group) || !(diskq = &group->diskq))
2474                         return (0);
2475 
2476                 /*
2477                  * Are we still seeing progress?
2478                  */
2479                 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
2480                         /*
2481                          * No progress seen, increment retry counter
2482                          */
2483                         if (counter++ > NUM_RETRIES) {
2484                                 return (-1);
2485                         }
2486                 } else {
2487                         /*
2488                          * Reset counter, as we've made progress
2489                          */
2490                         counter = 0;
2491                 }
2492         }
2493 
2494         return (0);
2495 }
2496 
2497 /*
2498  * decide what needs to be drained, disk or core
2499  * and drain it
2500  */
2501 int
2502 rdc_drain_queue(int index)
2503 {
2504         rdc_k_info_t *krdc = &rdc_k_info[index];
2505         rdc_group_t *group = krdc->group;
2506 
2507         if (!group)
2508                 return (0);
2509 
2510         if (RDC_IS_DISKQ(group))
2511                 return (rdc_drain_disk_queue(index));
2512         if (RDC_IS_MEMQ(group))
2513                 return (rdc_drain_net_queue(index));
2514         /* oops.. */
2515 #ifdef DEBUG
2516         cmn_err(CE_WARN, "!rdc_drain_queue: "
2517             "attempting drain of unknown Q type");
2518 #endif
2519         return (0);
2520 }
2521 
2522 /*
2523  * rdc_drain_net_queue
2524  * drain the async network queue for the whole group. Bail out if nothing
2525  * happens in 20 sec
2526  * returns -1 if it bails before the queues are drained.
2527  */
2528 int
2529 rdc_drain_net_queue(int index)
2530 {
2531         rdc_k_info_t *krdc = &rdc_k_info[index];
2532         volatile net_queue *q;
2533         int bail = 20;  /* bail out in about 20 secs */
2534         nsc_size_t blocks;
2535 
2536         /* Sanity checking */
2537         if (index > rdc_max_sets)
2538                 return (0);
2539         if (!krdc->group)
2540                 return (0);
2541         /* LINTED */
2542         if (!(q = &krdc->group->ra_queue))
2543                 return (0);
2544 
2545         /* CONSTCOND */
2546         while (1) {
2547 
2548                 if (((volatile rdc_aio_t *)q->net_qhead == NULL) &&
2549                     (krdc->group->rdc_thrnum == 0)) {
2550                         break;
2551                 }
2552 
2553                 blocks = q->blocks;
2554 
2555                 q = (volatile net_queue *)&krdc->group->ra_queue;
2556 
2557                 if ((blocks == q->blocks) &&
2558                     (--bail <= 0)) {
2559                         break;
2560                 }
2561 
2562                 delay(HZ);
2563         }
2564 
2565         if (bail <= 0)
2566                 return (-1);
2567 
2568         return (0);
2569 }
2570 
2571 /*
2572  * rdc_dump_queue
2573  * We want to release all the blocks currently on the network flushing queue
2574  * We already have them logged in the bitmap.
2575  */
2576 void
2577 rdc_dump_queue(int index)
2578 {
2579         rdc_k_info_t *krdc = &rdc_k_info[index];
2580         rdc_aio_t *aio;
2581         net_queue *q;
2582         rdc_group_t *group;
2583         disk_queue *dq;
2584         kmutex_t *qlock;
2585 
2586         group = krdc->group;
2587 
2588         q = &group->ra_queue;
2589         dq = &group->diskq;
2590 
2591         /*
2592          * gotta have both locks here for diskq
2593          */
2594 
2595         if (RDC_IS_DISKQ(group)) {
2596                 mutex_enter(&q->net_qlock);
2597                 if (q->qfill_sleeping == RDC_QFILL_AWAKE) {
2598                         int tries = 3;
2599 #ifdef DEBUG_DISKQ
2600                         cmn_err(CE_NOTE,
2601                             "!dumpq sending diskq->memq flusher to sleep");
2602 #endif
2603                         q->qfflags |= RDC_QFILLSLEEP;
2604                         mutex_exit(&q->net_qlock);
2605                         while (q->qfill_sleeping == RDC_QFILL_AWAKE && tries--)
2606                                 delay(5);
2607                         mutex_enter(&q->net_qlock);
2608                 }
2609         }
2610 
2611         if (RDC_IS_DISKQ(group)) {
2612                 qlock = &dq->disk_qlock;
2613                 (void) _rdc_rsrv_diskq(group);
2614         } else {
2615                 qlock = &q->net_qlock;
2616         }
2617 
2618         mutex_enter(qlock);
2619 
2620         group->seq = RDC_NEWSEQ;     /* reset the sequence number */
2621         group->seqack = RDC_NEWSEQ;
2622 
2623         /* if the q is on disk, dump the q->iohdr chain */
2624         if (RDC_IS_DISKQ(group)) {
2625                 rdc_dump_iohdrs(dq);
2626 
2627                 /* back up the nxtio pointer */
2628                 SET_QNXTIO(dq, QHEAD(dq));
2629                 SET_QCOALBOUNDS(dq, QHEAD(dq));
2630         }
2631 
2632         while (q->net_qhead) {
2633                 rdc_k_info_t *tmpkrdc;
2634                 aio = q->net_qhead;
2635                 tmpkrdc = &rdc_k_info[aio->index];
2636 
2637                 if (RDC_IS_DISKQ(group)) {
2638                         aio->qhandle->sb_user--;
2639                         if (aio->qhandle->sb_user == 0) {
2640                                 rdc_fixlen(aio);
2641                                 (void) nsc_free_buf(aio->qhandle);
2642                                 aio->qhandle = NULL;
2643                                 aio->handle = NULL;
2644                         }
2645                 } else {
2646                         if (aio->handle) {
2647                                 (void) nsc_free_buf(aio->handle);
2648                                 aio->handle = NULL;
2649                         }
2650                 }
2651 
2652                 q->net_qhead = aio->next;
2653                 RDC_CHECK_BIT(tmpkrdc, aio->pos, aio->len);
2654 
2655                 kmem_free(aio, sizeof (*aio));
2656                 if (tmpkrdc->io_kstats && !RDC_IS_DISKQ(group)) {
2657                         mutex_enter(tmpkrdc->io_kstats->ks_lock);
2658                         kstat_waitq_exit(KSTAT_IO_PTR(tmpkrdc->io_kstats));
2659                         mutex_exit(tmpkrdc->io_kstats->ks_lock);
2660                 }
2661 
2662         }
2663 
2664         q->net_qtail = NULL;
2665         q->blocks = 0;
2666         q->nitems = 0;
2667 
2668         /*
2669          * See if we have stalled threads.
2670          */
2671 done:
2672         if (group->asyncstall) {
2673                 group->asyncdis = 1;
2674                 cv_broadcast(&group->asyncqcv);
2675         }
2676         mutex_exit(qlock);
2677         if (RDC_IS_DISKQ(group)) {
2678                 mutex_exit(&q->net_qlock);
2679                 _rdc_rlse_diskq(group);
2680         }
2681 
2682 }
2683 
2684 
2685 /*
2686  * rdc_clnt_get
2687  * Get a CLIENT handle and cache it
2688  */
2689 
2690 static int
2691 rdc_clnt_get(rdc_srv_t *svp, rpcvers_t vers, struct chtab **rch, CLIENT **clp)
2692 {
2693         uint_t  max_msgsize;
2694         int     retries;
2695         int ret;
2696         struct cred             *cred;
2697         int num_clnts = 0;
2698         register struct chtab *ch;
2699         struct chtab **plistp;
2700         CLIENT *client = 0;
2701 
2702         if (rch) {
2703                 *rch = 0;
2704         }
2705 
2706         if (clp) {
2707                 *clp = 0;
2708         }
2709 
2710         retries = 6;    /* Never used for COTS in Solaris */
2711         cred = ddi_get_cred();
2712         max_msgsize = RDC_RPC_MAX;
2713 
2714         mutex_enter(&rdc_clnt_lock);
2715 
2716         ch = rdc_chtable;
2717         plistp = &rdc_chtable;
2718 
2719         /* find the right ch_list chain */
2720 
2721         for (ch = rdc_chtable; ch != NULL; ch = ch->ch_next) {
2722                 if (ch->ch_prog == RDC_PROGRAM &&
2723                     ch->ch_vers == vers &&
2724                     ch->ch_dev == svp->ri_knconf->knc_rdev &&
2725                     ch->ch_protofmly != NULL &&
2726                     strcmp(ch->ch_protofmly,
2727                     svp->ri_knconf->knc_protofmly) == 0) {
2728                         /* found the correct chain to walk */
2729                         break;
2730                 }
2731                 plistp = &ch->ch_next;
2732         }
2733 
2734         if (ch != NULL) {
2735                 /* walk the ch_list and try and find a free client */
2736 
2737                 for (num_clnts = 0; ch != NULL; ch = ch->ch_list, num_clnts++) {
2738                         if (ch->ch_inuse == FALSE) {
2739                                 /* suitable handle to reuse */
2740                                 break;
2741                         }
2742                         plistp = &ch->ch_list;
2743                 }
2744         }
2745 
2746         if (ch == NULL && num_clnts >= MAXCLIENTS) {
2747                 /* alloc a temporary handle and return */
2748 
2749                 rdc_clnt_toomany++;
2750                 mutex_exit(&rdc_clnt_lock);
2751 
2752                 ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2753                     RDC_PROGRAM, vers, max_msgsize, retries, cred, &client);
2754 
2755                 if (ret != 0) {
2756                         cmn_err(CE_NOTE,
2757                             "!rdc_call: tli_kcreate failed %d", ret);
2758                         return (ret);
2759                 }
2760 
2761                 *rch = 0;
2762                 *clp = client;
2763                 (void) CLNT_CONTROL(client, CLSET_PROGRESS, NULL);
2764                 return (ret);
2765         }
2766 
2767         if (ch != NULL) {
2768                 /* reuse a cached handle */
2769 
2770                 ch->ch_inuse = TRUE;
2771                 ch->ch_timesused++;
2772                 mutex_exit(&rdc_clnt_lock);
2773 
2774                 *rch = ch;
2775 
2776                 if (ch->ch_client == NULL) {
2777                         ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2778                             RDC_PROGRAM, vers, max_msgsize, retries,
2779                             cred, &ch->ch_client);
2780                         if (ret != 0) {
2781                                 ch->ch_inuse = FALSE;
2782                                 return (ret);
2783                         }
2784 
2785                         (void) CLNT_CONTROL(ch->ch_client, CLSET_PROGRESS,
2786                             NULL);
2787                         *clp = ch->ch_client;
2788 
2789                         return (0);
2790                 } else {
2791                 /*
2792                  * Consecutive calls to CLNT_CALL() on the same client handle
2793                  * get the same transaction ID.  We want a new xid per call,
2794                  * so we first reinitialise the handle.
2795                  */
2796                         (void) clnt_tli_kinit(ch->ch_client, svp->ri_knconf,
2797                             &(svp->ri_addr), max_msgsize, retries, cred);
2798 
2799                         *clp = ch->ch_client;
2800                         return (0);
2801                 }
2802         }
2803 
2804         /* create new handle and cache it */
2805         ch = (struct chtab *)kmem_zalloc(sizeof (*ch), KM_SLEEP);
2806 
2807         if (ch) {
2808                 ch->ch_inuse = TRUE;
2809                 ch->ch_prog = RDC_PROGRAM;
2810                 ch->ch_vers = vers;
2811                 ch->ch_dev = svp->ri_knconf->knc_rdev;
2812                 ch->ch_protofmly = (char *)kmem_zalloc(
2813                     strlen(svp->ri_knconf->knc_protofmly)+1, KM_SLEEP);
2814                 if (ch->ch_protofmly)
2815                         (void) strcpy(ch->ch_protofmly,
2816                             svp->ri_knconf->knc_protofmly);
2817                 *plistp = ch;
2818         }
2819 
2820         mutex_exit(&rdc_clnt_lock);
2821 
2822         ret = clnt_tli_kcreate(svp->ri_knconf, &(svp->ri_addr),
2823             RDC_PROGRAM, vers, max_msgsize, retries, cred, clp);
2824 
2825         if (ret != 0) {
2826                 if (ch)
2827                         ch->ch_inuse = FALSE;
2828                 cmn_err(CE_NOTE, "!rdc_call: tli_kcreate failed %d", ret);
2829                 return (ret);
2830         }
2831 
2832         *rch = ch;
2833         if (ch)
2834                 ch->ch_client = *clp;
2835 
2836         (void) CLNT_CONTROL(*clp, CLSET_PROGRESS, NULL);
2837 
2838         return (ret);
2839 }
2840 
2841 
2842 long rdc_clnt_count = 0;
2843 
2844 /*
2845  * rdc_clnt_call
2846  * Arguments:
2847  *      rdc_srv_t *svp - rdc servinfo
2848  *      rpcproc_t proc; - rpcid
2849  *      rpcvers_t vers; - protocol version
2850  *      xdrproc_t xargs;- xdr function
2851  *      caddr_t argsp;- args to xdr function
2852  *      xdrproc_t xres;- xdr function
2853  *      caddr_t resp;- args to xdr function
2854  *      struct timeval timeout;
2855  * Performs RPC client call using specific protocol and version
2856  */
2857 
2858 int
2859 rdc_clnt_call(rdc_srv_t *svp, rpcproc_t proc, rpcvers_t vers,
2860                 xdrproc_t xargs, caddr_t argsp,
2861                 xdrproc_t xres, caddr_t resp, struct timeval *timeout)
2862 {
2863         CLIENT *rh = NULL;
2864         int err;
2865         int tries = 0;
2866         struct chtab *ch = NULL;
2867 
2868         err = rdc_clnt_get(svp, vers, &ch, &rh);
2869         if (err || !rh)
2870                 return (err);
2871 
2872         do {
2873                 DTRACE_PROBE3(rdc_clnt_call_1,
2874                     CLIENT *, rh, rpcproc_t, proc, xdrproc_t, xargs);
2875 
2876                 err = cl_call_sig(rh, proc, xargs, argsp, xres, resp, *timeout);
2877 
2878                 DTRACE_PROBE1(rdc_clnt_call_end, int, err);
2879 
2880                 switch (err) {
2881                         case RPC_SUCCESS: /* bail now */
2882                                 goto done;
2883                         case RPC_INTR:  /* No recovery from this */
2884                                 goto done;
2885                         case RPC_PROGVERSMISMATCH:
2886                                 goto done;
2887                         case RPC_TLIERROR:
2888                                 /* fall thru */
2889                         case RPC_XPRTFAILED:
2890                                 /* Delay here to err on side of caution */
2891                                 /* fall thru */
2892                         case RPC_VERSMISMATCH:
2893 
2894                         default:
2895                                 if (IS_UNRECOVERABLE_RPC(err)) {
2896                                         goto done;
2897                                 }
2898                                 tries++;
2899                         /*
2900                          * The call is in progress (over COTS)
2901                          * Try the CLNT_CALL again, but don't
2902                          * print a noisy error message
2903                          */
2904                                 if (err == RPC_INPROGRESS)
2905                                         break;
2906                                 cmn_err(CE_NOTE, "!SNDR client: err %d %s",
2907                                     err, clnt_sperrno(err));
2908                         }
2909         } while (tries && (tries < 2));
2910 done:
2911         ++rdc_clnt_count;
2912         rdc_clnt_free(ch, rh);
2913         return (err);
2914 }
2915 
2916 
2917 /*
2918  * Call an rpc from the client side, not caring which protocol is used.
2919  */
2920 int
2921 rdc_clnt_call_any(rdc_srv_t *svp, rdc_if_t *ip, rpcproc_t proc,
2922                 xdrproc_t xargs, caddr_t argsp,
2923                 xdrproc_t xres, caddr_t resp, struct timeval *timeout)
2924 {
2925         rpcvers_t vers;
2926         int rc;
2927 
2928         if (ip != NULL) {
2929                 vers = ip->rpc_version;
2930         } else {
2931                 vers = RDC_VERS_MAX;
2932         }
2933 
2934         do {
2935                 rc = rdc_clnt_call(svp, proc, vers, xargs, argsp,
2936                     xres, resp, timeout);
2937 
2938                 if (rc == RPC_PROGVERSMISMATCH) {
2939                         /*
2940                          * Downgrade and try again.
2941                          */
2942                         vers--;
2943                 }
2944         } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH));
2945 
2946         if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) {
2947                 mutex_enter(&rdc_ping_lock);
2948                 ip->rpc_version = vers;
2949                 mutex_exit(&rdc_ping_lock);
2950         }
2951 
2952         return (rc);
2953 }
2954 
2955 /*
2956  * Call an rpc from the client side, starting with protocol specified
2957  */
2958 int
2959 rdc_clnt_call_walk(rdc_k_info_t *krdc, rpcproc_t proc, xdrproc_t xargs,
2960                 caddr_t argsp, xdrproc_t xres, caddr_t resp,
2961                 struct timeval *timeout)
2962 {
2963         int rc;
2964         rpcvers_t vers;
2965         rdc_srv_t *svp = krdc->lsrv;
2966         rdc_if_t *ip = krdc->intf;
2967         vers = krdc->rpc_version;
2968 
2969         do {
2970                 rc = rdc_clnt_call(svp, proc, vers, xargs, argsp,
2971                     xres, resp, timeout);
2972 
2973                 if (rc == RPC_PROGVERSMISMATCH) {
2974                         /*
2975                          * Downgrade and try again.
2976                          */
2977                         vers--;
2978                 }
2979         } while ((vers >= RDC_VERS_MIN) && (rc == RPC_PROGVERSMISMATCH));
2980 
2981         if ((rc == 0) && (ip != NULL) && (vers != ip->rpc_version)) {
2982                 mutex_enter(&rdc_ping_lock);
2983                 ip->rpc_version = vers;
2984                 mutex_exit(&rdc_ping_lock);
2985         }
2986 
2987         return (rc);
2988 }
2989 
2990 /*
2991  * rdc_clnt_free
2992  * Free a client structure into the cache, or if this was a temporary
2993  * handle allocated above MAXCLIENTS, destroy it.
2994  */
2995 static void
2996 rdc_clnt_free(struct chtab *ch, CLIENT *clp)
2997 {
2998         if (ch != NULL) {
2999                 /* cached client, just clear inuse flag and return */
3000                 ASSERT(ch->ch_client == clp);
3001                 ch->ch_inuse = FALSE;
3002                 return;
3003         }
3004 
3005         /* temporary handle allocated above MAXCLIENTS, so destroy it */
3006 
3007         if (clp->cl_auth) {
3008                 AUTH_DESTROY(clp->cl_auth);
3009                 clp->cl_auth = 0;
3010         }
3011 
3012         CLNT_DESTROY(clp);
3013 }
3014 
3015 
3016 /*
3017  * _rdc_clnt_destroy
3018  * Free a chain (ch_list or ch_next) of cached clients
3019  */
3020 static int
3021 _rdc_clnt_destroy(struct chtab **p, const int list)
3022 {
3023         struct chtab *ch;
3024         int leak = 0;
3025 
3026         if (!p)
3027                 return (0);
3028 
3029         while (*p != NULL) {
3030                 ch = *p;
3031 
3032                 /*
3033                  * unlink from the chain
3034                  * - this leaks the client if it was inuse
3035                  */
3036 
3037                 *p = list ? ch->ch_list : ch->ch_next;
3038 
3039                 if (!ch->ch_inuse) {
3040                         /* unused client - destroy it */
3041 
3042                         if (ch->ch_client) {
3043                                 if (ch->ch_client->cl_auth) {
3044                                         AUTH_DESTROY(ch->ch_client->cl_auth);
3045                                         ch->ch_client->cl_auth = 0;
3046                                 }
3047 
3048                                 CLNT_DESTROY(ch->ch_client);
3049                                 ch->ch_client = 0;
3050                         }
3051 
3052                         if (ch->ch_protofmly)
3053                                 kmem_free(ch->ch_protofmly,
3054                                     strlen(ch->ch_protofmly)+1);
3055 
3056                         kmem_free(ch, sizeof (*ch));
3057                 } else {
3058                         /* remember client leak */
3059                         leak++;
3060                 }
3061         }
3062 
3063         return (leak);
3064 }
3065 
3066 
3067 /*
3068  * rdc_clnt_destroy
3069  * Free client caching table on unconfigure
3070  */
3071 void
3072 rdc_clnt_destroy(void)
3073 {
3074         struct chtab *ch;
3075         int leak = 0;
3076 
3077         mutex_enter(&rdc_clnt_lock);
3078 
3079         /* destroy each ch_list chain */
3080 
3081         for (ch = rdc_chtable; ch; ch = ch->ch_next) {
3082                 leak += _rdc_clnt_destroy(&ch->ch_list, 1);
3083         }
3084 
3085         /* destroy the main ch_next chain */
3086         leak += _rdc_clnt_destroy(&rdc_chtable, 0);
3087 
3088         if (leak) {
3089                 /* we are about to leak clients */
3090                 cmn_err(CE_WARN,
3091                     "!rdc_clnt_destroy: leaking %d inuse clients", leak);
3092         }
3093 
3094         mutex_exit(&rdc_clnt_lock);
3095 }
3096 
3097 #ifdef  DEBUG
3098 /*
3099  * Function to send an asynchronous net_data6 request
3100  * direct to a server to allow the generation of
3101  * out of order requests for ZatoIchi tests.
3102  */
3103 int
3104 rdc_async6(void *arg, int mode, int *rvp)
3105 {
3106         int                     index;
3107         rdc_async6_t            async6;
3108         struct net_data6        data6;
3109         rdc_k_info_t            *krdc;
3110         rdc_u_info_t            *urdc;
3111         char                    *data;
3112         int                     datasz;
3113         char                    *datap;
3114         int                     rc;
3115         struct timeval          t;
3116         struct netwriteres      netret;
3117         int i;
3118 
3119         rc = 0;
3120         *rvp = 0;
3121         /*
3122          * copyin the user's arguments.
3123          */
3124         if (ddi_copyin(arg, &async6, sizeof (async6), mode) < 0) {
3125                 return (EFAULT);
3126         }
3127 
3128         /*
3129          * search by the secondary host and file.
3130          */
3131         mutex_enter(&rdc_conf_lock);
3132         for (index = 0; index < rdc_max_sets; index++) {
3133                 urdc = &rdc_u_info[index];
3134                 krdc = &rdc_k_info[index];
3135 
3136                 if (!IS_CONFIGURED(krdc))
3137                         continue;
3138                 if (!IS_ENABLED(urdc))
3139                         continue;
3140                 if (!IS_ASYNC(urdc))
3141                         continue;
3142                 if (krdc->rpc_version < RDC_VERSION6)
3143                         continue;
3144 
3145                 if ((strncmp(urdc->secondary.intf, async6.sechost,
3146                     MAX_RDC_HOST_SIZE) == 0) &&
3147                     (strncmp(urdc->secondary.file, async6.secfile,
3148                     NSC_MAXPATH) == 0)) {
3149                         break;
3150                 }
3151         }
3152         mutex_exit(&rdc_conf_lock);
3153         if (index >= rdc_max_sets) {
3154                 return (ENOENT);
3155         }
3156 
3157         if (async6.spos != -1) {
3158                 if ((async6.spos < async6.pos) ||
3159                     ((async6.spos + async6.slen) >
3160                     (async6.pos + async6.len))) {
3161                         cmn_err(CE_WARN, "!Sub task not within range "
3162                             "start %d length %d sub start %d sub length %d",
3163                             async6.pos, async6.len, async6.spos, async6.slen);
3164                         return (EIO);
3165                 }
3166         }
3167 
3168         datasz = FBA_SIZE(1);
3169         data = kmem_alloc(datasz, KM_SLEEP);
3170         datap = data;
3171         while (datap < &data[datasz]) {
3172                 /* LINTED */
3173                 *datap++ = async6.pat;
3174         }
3175 
3176         /*
3177          * Fill in the net databuffer prior to transmission.
3178          */
3179 
3180         data6.local_cd = krdc->index;
3181         if (krdc->remote_index == -1) {
3182                 cmn_err(CE_WARN, "!Remote index not known");
3183                 kmem_free(data, datasz);
3184                 return (EIO);
3185         } else {
3186                 data6.cd = krdc->remote_index;
3187         }
3188         data6.pos = async6.pos;
3189         data6.len = async6.len;
3190         data6.flag = 0;
3191         data6.idx = async6.idx;
3192         data6.seq = async6.seq;
3193 
3194         if (async6.spos == -1) {
3195                 data6.sfba = async6.pos;
3196                 data6.nfba = async6.len;
3197                 data6.endoblk = 1;
3198 
3199         } else {
3200                 data6.sfba = async6.spos;
3201                 data6.nfba = async6.slen;
3202                 data6.endoblk = async6.endind;
3203         }
3204 
3205         data6.data.data_len = datasz;
3206         data6.data.data_val = data;
3207 
3208         t.tv_sec = rdc_rpc_tmout;
3209         t.tv_usec = 0;
3210 
3211         netret.vecdata.vecdata_val = NULL;
3212         netret.vecdata.vecdata_len = 0;
3213 
3214 
3215         rc = rdc_clnt_call(krdc->lsrv, RDCPROC_WRITE6, krdc->rpc_version,
3216             xdr_net_data6, (char *)&data6, xdr_netwriteres, (char *)&netret,
3217             &t);
3218 
3219         kmem_free(data, datasz);
3220         if (rc == 0) {
3221                 if (netret.result < 0) {
3222                         rc = -netret.result;
3223                 }
3224                 cmn_err(CE_NOTE, "!async6: seq %u result %d index %d "
3225                     "pendcnt %d",
3226                     netret.seq, netret.result, netret.index,
3227                     netret.vecdata.vecdata_len);
3228                 for (i = 0; i < netret.vecdata.vecdata_len; i++) {
3229                         net_pendvec_t pvec;
3230                         bcopy(netret.vecdata.vecdata_val + i, &pvec,
3231                             sizeof (net_pendvec_t));
3232                         cmn_err(CE_NOTE, "!Seq %u pos %llu len %llu",
3233                             pvec.seq, (unsigned long long)pvec.apos,
3234                             (unsigned long long)pvec.alen);
3235                 }
3236                 if (netret.vecdata.vecdata_val)
3237                         kmem_free(netret.vecdata.vecdata_val,
3238                             netret.vecdata.vecdata_len *
3239                             sizeof (net_pendvec_t));
3240         } else {
3241                 cmn_err(CE_NOTE, "!async6: rpc call failed %d", rc);
3242         }
3243         *rvp = netret.index;
3244         return (rc);
3245 }
3246 
3247 /*
3248  * Function to send an net_read6 request
3249  * direct to a server to allow the generation of
3250  * read requests.
3251  */
3252 int
3253 rdc_readgen(void *arg, int mode, int *rvp)
3254 {
3255         int                     index;
3256         rdc_readgen_t           readgen;
3257         rdc_readgen32_t         readgen32;
3258         struct rread6           read6;
3259         struct rread            read5;
3260         rdc_k_info_t            *krdc;
3261         int                     ret;
3262         struct timeval          t;
3263         struct rdcrdresult      rr;
3264         int                     err;
3265 
3266         *rvp = 0;
3267         rr.rr_bufsize = 0;      /* rpc data buffer length (bytes) */
3268         rr.rr_data = NULL;      /* rpc data buffer */
3269         if (ddi_model_convert_from(mode & FMODELS) == DDI_MODEL_ILP32) {
3270                 if (ddi_copyin(arg, &readgen32, sizeof (readgen32), mode)) {
3271                         return (EFAULT);
3272                 }
3273                 (void) strncpy(readgen.sechost, readgen32.sechost,
3274                     MAX_RDC_HOST_SIZE);
3275                 (void) strncpy(readgen.secfile, readgen32.secfile, NSC_MAXPATH);
3276                 readgen.len = readgen32.len;
3277                 readgen.pos = readgen32.pos;
3278                 readgen.idx = readgen32.idx;
3279                 readgen.flag = readgen32.flag;
3280                 readgen.data = (void *)(unsigned long)readgen32.data;
3281                 readgen.rpcversion = readgen32.rpcversion;
3282         } else {
3283                 if (ddi_copyin(arg, &readgen, sizeof (readgen), mode)) {
3284                         return (EFAULT);
3285                 }
3286         }
3287         switch (readgen.rpcversion) {
3288         case 5:
3289         case 6:
3290                 break;
3291         default:
3292                 return (EINVAL);
3293         }
3294 
3295         mutex_enter(&rdc_conf_lock);
3296         index = rdc_lookup_byhostdev(readgen.sechost, readgen.secfile);
3297         if (index >= 0) {
3298                 krdc = &rdc_k_info[index];
3299         }
3300         if (index < 0 || (krdc->type_flag & RDC_DISABLEPEND)) {
3301                 mutex_exit(&rdc_conf_lock);
3302                 return (ENODEV);
3303         }
3304         /*
3305          * we should really call setbusy here.
3306          */
3307         mutex_exit(&rdc_conf_lock);
3308 
3309         t.tv_sec = rdc_rpc_tmout;
3310         t.tv_usec = 0;
3311         if (krdc->remote_index == -1) {
3312                 cmn_err(CE_WARN, "!Remote index not known");
3313                 ret = EIO;
3314                 goto out;
3315         }
3316         if (readgen.rpcversion == 6) {
3317                 read6.cd = krdc->remote_index;
3318                 read6.len = readgen.len;
3319                 read6.pos = readgen.pos;
3320                 read6.idx = readgen.idx;
3321                 read6.flag = readgen.flag;
3322         } else {
3323                 read5.cd = krdc->remote_index;
3324                 read5.len = readgen.len;
3325                 read5.pos = readgen.pos;
3326                 read5.idx = readgen.idx;
3327                 read5.flag = readgen.flag;
3328         }
3329 
3330         if (readgen.flag & RDC_RREAD_START) {
3331                 if (readgen.rpcversion == 6) {
3332                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
3333                             RDC_VERSION6, xdr_rread6, (char *)&read6,
3334                             xdr_int, (char *)&ret, &t);
3335                 } else {
3336                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
3337                             RDC_VERSION5, xdr_rread, (char *)&read5,
3338                             xdr_int, (char *)&ret, &t);
3339                 }
3340                 if (err == 0) {
3341                         *rvp = ret;
3342                         ret = 0;
3343                 } else {
3344                         ret = EPROTO;
3345                 }
3346         } else {
3347                 if (readgen.rpcversion == 6) {
3348                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ6,
3349                             RDC_VERSION6, xdr_rread6, (char *)&read6,
3350                             xdr_rdresult, (char *)&rr, &t);
3351                 } else {
3352                         err = rdc_clnt_call(krdc->lsrv, RDCPROC_READ5,
3353                             RDC_VERSION5, xdr_rread, (char *)&read5,
3354                             xdr_rdresult, (char *)&rr, &t);
3355                 }
3356                 if (err == 0) {
3357                         if (rr.rr_status != RDC_OK) {
3358                                 ret = EIO;
3359                                 goto out;
3360                         }
3361                         *rvp = rr.rr_bufsize;
3362                         if (ddi_copyout(rr.rr_data, readgen.data,
3363                             rr.rr_bufsize, mode) != 0) {
3364                                 ret = EFAULT;
3365                                 goto out;
3366                         }
3367                         ret = 0;
3368                 } else {
3369                         ret = EPROTO;
3370                         goto out;
3371                 }
3372         }
3373 out:
3374         if (rr.rr_data) {
3375                 kmem_free(rr.rr_data, rr.rr_bufsize);
3376         }
3377         return (ret);
3378 }
3379 
3380 
3381 #endif