1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 
  27 #include <sys/types.h>
  28 #include <sys/ksynch.h>
  29 #include <sys/kmem.h>
  30 #include <sys/errno.h>
  31 #include <sys/cmn_err.h>
  32 #include <sys/debug.h>
  33 #include <sys/cred.h>
  34 #include <sys/file.h>
  35 #include <sys/ddi.h>
  36 #include <sys/nsc_thread.h>
  37 #include <sys/unistat/spcs_s.h>
  38 #include <sys/unistat/spcs_errors.h>
  39 
  40 #include <sys/unistat/spcs_s_k.h>
  41 #ifdef DS_DDICT
  42 #include "../contract.h"
  43 #endif
  44 
  45 #include <sys/nsctl/nsctl.h>
  46 
  47 #include <sys/sdt.h>              /* dtrace is S10 or later */
  48 
  49 #include "rdc.h"
  50 #include "rdc_io.h"
  51 #include "rdc_bitmap.h"
  52 
  53 /*
  54  * Remote Dual Copy
  55  *
  56  * This file contains the nsctl io provider functionality for RDC.
  57  *
  58  * RDC is implemented as a simple filter module that pushes itself between
  59  * user (SIMCKD, STE, etc.) and SDBC.
  60  */
  61 
  62 
  63 static int _rdc_open_count;
  64 int     rdc_eio_nobmp = 0;
  65 
  66 nsc_io_t *_rdc_io_hc;
  67 static nsc_io_t *_rdc_io_hr;
  68 static nsc_def_t _rdc_fd_def[], _rdc_io_def[], _rdc_ior_def[];
  69 
  70 void _rdc_deinit_dev();
  71 int rdc_diskq_enqueue(rdc_k_info_t *, rdc_aio_t *);
  72 extern void rdc_unintercept_diskq(rdc_group_t *);
  73 rdc_aio_t *rdc_aio_tbuf_get(void *, void *, int, int, int, int, int);
  74 
  75 static nsc_buf_t *_rdc_alloc_handle(void (*)(), void (*)(),
  76     void (*)(), rdc_fd_t *);
  77 static int _rdc_free_handle(rdc_buf_t *, rdc_fd_t *);
  78 
  79 #ifdef DEBUG
  80 int     rdc_overlap_cnt;
  81 int     rdc_overlap_hnd_cnt;
  82 #endif
  83 
  84 static rdc_info_dev_t *rdc_devices;
  85 
  86 extern int _rdc_rsrv_diskq(rdc_group_t *group);
  87 extern void _rdc_rlse_diskq(rdc_group_t *group);
  88 
  89 /*
  90  * _rdc_init_dev
  91  *      Initialise the io provider.
  92  */
  93 
  94 int
  95 _rdc_init_dev()
  96 {
  97         _rdc_io_hc = nsc_register_io("rdc-high-cache",
  98             NSC_RDCH_ID|NSC_REFCNT|NSC_FILTER, _rdc_io_def);
  99         if (_rdc_io_hc == NULL)
 100                 cmn_err(CE_WARN, "!rdc: nsc_register_io (high, cache) failed.");
 101 
 102         _rdc_io_hr = nsc_register_io("rdc-high-raw",
 103             NSC_RDCHR_ID|NSC_REFCNT|NSC_FILTER, _rdc_ior_def);
 104         if (_rdc_io_hr == NULL)
 105                 cmn_err(CE_WARN, "!rdc: nsc_register_io (high, raw) failed.");
 106 
 107         if (!_rdc_io_hc || !_rdc_io_hr) {
 108                 _rdc_deinit_dev();
 109                 return (ENOMEM);
 110         }
 111 
 112         return (0);
 113 }
 114 
 115 
 116 /*
 117  * _rdc_deinit_dev
 118  *      De-initialise the io provider.
 119  *
 120  */
 121 
 122 void
 123 _rdc_deinit_dev()
 124 {
 125         int rc;
 126 
 127         if (_rdc_io_hc) {
 128                 if ((rc = nsc_unregister_io(_rdc_io_hc, 0)) != 0)
 129                         cmn_err(CE_WARN,
 130                             "!rdc: nsc_unregister_io (high, cache) failed: %d",
 131                             rc);
 132         }
 133 
 134         if (_rdc_io_hr) {
 135                 if ((rc = nsc_unregister_io(_rdc_io_hr, 0)) != 0)
 136                         cmn_err(CE_WARN,
 137                             "!rdc: nsc_unregister_io (high, raw) failed: %d",
 138                             rc);
 139         }
 140 }
 141 
 142 
 143 /*
 144  * rdc_idev_open
 145  * - Open the nsctl file descriptors for the data devices.
 146  *
 147  * Must be called with rdc_conf_lock held.
 148  * id_sets is protected by rdc_conf_lock.
 149  */
 150 static rdc_info_dev_t *
 151 rdc_idev_open(rdc_k_info_t *krdc, char *pathname, int *rc)
 152 {
 153         rdc_info_dev_t *dp;
 154 
 155         ASSERT(MUTEX_HELD(&rdc_conf_lock));
 156 
 157         for (dp = rdc_devices; dp; dp = dp->id_next) {
 158                 if (dp->id_cache_dev.bi_fd &&
 159                     strcmp(pathname, nsc_pathname(dp->id_cache_dev.bi_fd)) == 0)
 160                         break;
 161         }
 162 
 163         if (!dp) {
 164                 dp = kmem_zalloc(sizeof (*dp), KM_SLEEP);
 165                 if (!dp)
 166                         return (NULL);
 167 
 168                 dp->id_cache_dev.bi_krdc = krdc;
 169                 dp->id_cache_dev.bi_fd = nsc_open(pathname,
 170                     NSC_RDCHR_ID|NSC_RDWR|NSC_DEVICE,
 171                     _rdc_fd_def, (blind_t)&dp->id_cache_dev, rc);
 172                 if (!dp->id_cache_dev.bi_fd) {
 173                         kmem_free(dp, sizeof (*dp));
 174                         return (NULL);
 175                 }
 176 
 177                 dp->id_raw_dev.bi_krdc = krdc;
 178                 dp->id_raw_dev.bi_fd = nsc_open(pathname,
 179                     NSC_RDCHR_ID|NSC_RDWR|NSC_DEVICE,
 180                     _rdc_fd_def, (blind_t)&dp->id_raw_dev, rc);
 181                 if (!dp->id_raw_dev.bi_fd) {
 182                         (void) nsc_close(dp->id_cache_dev.bi_fd);
 183                         kmem_free(dp, sizeof (*dp));
 184                         return (NULL);
 185                 }
 186 
 187                 mutex_init(&dp->id_rlock, NULL, MUTEX_DRIVER, NULL);
 188                 cv_init(&dp->id_rcv, NULL, CV_DRIVER, NULL);
 189 
 190                 dp->id_next = rdc_devices;
 191                 rdc_devices = dp;
 192         }
 193 
 194         dp->id_sets++;
 195         return (dp);
 196 }
 197 
 198 
 199 /*
 200  * rdc_idev_close
 201  * - Close the nsctl file descriptors for the data devices.
 202  *
 203  * Must be called with rdc_conf_lock and dp->id_rlock held.
 204  * Will release dp->id_rlock before returning.
 205  *
 206  * id_sets is protected by rdc_conf_lock.
 207  */
 208 static void
 209 rdc_idev_close(rdc_k_info_t *krdc, rdc_info_dev_t *dp)
 210 {
 211         rdc_info_dev_t **dpp;
 212 #ifdef DEBUG
 213         int count = 0;
 214 #endif
 215 
 216         ASSERT(MUTEX_HELD(&rdc_conf_lock));
 217         ASSERT(MUTEX_HELD(&dp->id_rlock));
 218 
 219         dp->id_sets--;
 220         if (dp->id_sets > 0) {
 221                 mutex_exit(&dp->id_rlock);
 222                 return;
 223         }
 224 
 225         /* external references must have gone */
 226         ASSERT((krdc->c_ref + krdc->r_ref + krdc->b_ref) == 0);
 227 
 228         /* unlink from chain */
 229 
 230         for (dpp = &rdc_devices; *dpp; dpp = &((*dpp)->id_next)) {
 231                 if (*dpp == dp) {
 232                         /* unlink */
 233                         *dpp = dp->id_next;
 234                         break;
 235                 }
 236         }
 237 
 238         /*
 239          * Wait for all reserves to go away - the rpc server is
 240          * running asynchronously with this close, and so we
 241          * have to wait for it to spot that the krdc is !IS_ENABLED()
 242          * and throw away the nsc_buf_t's that it has allocated
 243          * and release the device.
 244          */
 245 
 246         while (IS_CRSRV(krdc) || IS_RRSRV(krdc)) {
 247 #ifdef DEBUG
 248                 if (!(++count % 16)) {
 249                         cmn_err(CE_NOTE,
 250                             "!_rdc_idev_close(%s): waiting for nsc_release",
 251                             rdc_u_info[krdc->index].primary.file);
 252                 }
 253                 if (count > (16*20)) {
 254                         /* waited for 20 seconds - too long - panic */
 255                         cmn_err(CE_PANIC,
 256                             "!_rdc_idev_close(%s, %p): lost nsc_release",
 257                             rdc_u_info[krdc->index].primary.file, (void *)krdc);
 258                 }
 259 #endif
 260                 mutex_exit(&dp->id_rlock);
 261                 delay(HZ>>4);
 262                 mutex_enter(&dp->id_rlock);
 263         }
 264 
 265         if (dp->id_cache_dev.bi_fd) {
 266                 (void) nsc_close(dp->id_cache_dev.bi_fd);
 267                 dp->id_cache_dev.bi_fd = NULL;
 268         }
 269 
 270         if (dp->id_raw_dev.bi_fd) {
 271                 (void) nsc_close(dp->id_raw_dev.bi_fd);
 272                 dp->id_raw_dev.bi_fd = NULL;
 273         }
 274 
 275         mutex_exit(&dp->id_rlock);
 276         mutex_destroy(&dp->id_rlock);
 277         cv_destroy(&dp->id_rcv);
 278 
 279         kmem_free(dp, sizeof (*dp));
 280 }
 281 
 282 
 283 /*
 284  * This function provokes an nsc_reserve() for the device which
 285  * if successful will populate krdc->maxfbas and urdc->volume_size
 286  * via the _rdc_attach_fd() callback.
 287  */
 288 void
 289 rdc_get_details(rdc_k_info_t *krdc)
 290 {
 291         int rc;
 292         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 293         nsc_size_t vol_size, maxfbas;
 294 
 295         if (_rdc_rsrv_devs(krdc, RDC_RAW, RDC_INTERNAL) == 0) {
 296                 /*
 297                  * if the vol is already reserved,
 298                  * volume_size won't be populated on enable because
 299                  * it is a *fake* reserve and does not make it to
 300                  * _rdc_attach_fd(). So do it here.
 301                  */
 302                 rc = nsc_partsize(RDC_U_FD(krdc), &vol_size);
 303                 if (rc != 0) {
 304 #ifdef DEBUG
 305                         cmn_err(CE_WARN,
 306                             "!rdc_get_details: partsize failed (%d)", rc);
 307 #endif /* DEBUG */
 308                         urdc->volume_size = vol_size = 0;
 309                 }
 310 
 311                 urdc->volume_size = vol_size;
 312                 rc = nsc_maxfbas(RDC_U_FD(krdc), 0, &maxfbas);
 313                 if (rc != 0) {
 314 #ifdef DEBUG
 315                         cmn_err(CE_WARN,
 316                             "!rdc_get_details: maxfbas failed (%d)", rc);
 317 #endif /* DEBUG */
 318                         maxfbas = 0;
 319                 }
 320                 krdc->maxfbas = min(RDC_MAX_MAXFBAS, maxfbas);
 321 
 322                 _rdc_rlse_devs(krdc, RDC_RAW);
 323         }
 324 }
 325 
 326 
 327 /*
 328  * Should only be used by the config code.
 329  */
 330 
 331 int
 332 rdc_dev_open(rdc_set_t *rdc_set, int options)
 333 {
 334         rdc_k_info_t *krdc;
 335         int index;
 336         int rc;
 337         char *pathname;
 338 
 339         ASSERT(MUTEX_HELD(&rdc_conf_lock));
 340 
 341         if (options & RDC_OPT_PRIMARY)
 342                 pathname = rdc_set->primary.file;
 343         else
 344                 pathname = rdc_set->secondary.file;
 345 
 346         for (index = 0; index < rdc_max_sets; index++) {
 347                 krdc = &rdc_k_info[index];
 348 
 349                 if (!IS_CONFIGURED(krdc))
 350                         break;
 351         }
 352 
 353         if (index == rdc_max_sets) {
 354 #ifdef DEBUG
 355                 cmn_err(CE_WARN, "!rdc_dev_open: out of cd\'s");
 356 #endif
 357                 index = -EINVAL;
 358                 goto out;
 359         }
 360 
 361         if (krdc->devices && (krdc->c_fd || krdc->r_fd)) {
 362 #ifdef DEBUG
 363                 cmn_err(CE_WARN, "!rdc_dev_open: %s already open", pathname);
 364 #endif
 365                 index = -EINVAL;
 366                 goto out;
 367         }
 368 
 369         _rdc_open_count++;
 370 
 371         krdc->devices = rdc_idev_open(krdc, pathname, &rc);
 372         if (!krdc->devices) {
 373                 index = -rc;
 374                 goto open_fail;
 375         }
 376 
 377         /*
 378          * Grab the device size and maxfbas now.
 379          */
 380 
 381         rdc_get_details(krdc);
 382 
 383 out:
 384         return (index);
 385 
 386 open_fail:
 387         _rdc_open_count--;
 388 
 389         return (index);
 390 }
 391 
 392 
 393 void
 394 rdc_dev_close(rdc_k_info_t *krdc)
 395 {
 396         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 397 
 398         mutex_enter(&rdc_conf_lock);
 399 
 400         if (krdc->devices)
 401                 mutex_enter(&krdc->devices->id_rlock);
 402 
 403 #ifdef DEBUG
 404         if (!krdc->devices || !krdc->c_fd || !krdc->r_fd) {
 405                 cmn_err(CE_WARN,
 406                     "!rdc_dev_close(%p): c_fd %p r_fd %p", (void *)krdc,
 407                     (void *) (krdc->devices ? krdc->c_fd : 0),
 408                     (void *) (krdc->devices ? krdc->r_fd : 0));
 409         }
 410 #endif
 411 
 412         if (krdc->devices) {
 413                 /* rdc_idev_close will release id_rlock */
 414                 rdc_idev_close(krdc, krdc->devices);
 415                 krdc->devices = NULL;
 416         }
 417 
 418         urdc->primary.file[0] = '\0';
 419 
 420         if (_rdc_open_count <= 0) {
 421                 cmn_err(CE_WARN, "!rdc: _rdc_open_count corrupt: %d",
 422                     _rdc_open_count);
 423         }
 424 
 425         _rdc_open_count--;
 426 
 427         mutex_exit(&rdc_conf_lock);
 428 }
 429 
 430 
 431 /*
 432  * rdc_intercept
 433  *
 434  * Register for IO on this device with nsctl.
 435  *
 436  * For a 1-to-many primary we register for each krdc and let nsctl sort
 437  * out which it wants to be using. This means that we cannot tell which
 438  * krdc will receive the incoming io from nsctl, though we do know that
 439  * at any one time only one krdc will be 'attached' and so get io from
 440  * nsctl.
 441  *
 442  * So the krdc->many_next pointer is maintained as a circular list. The
 443  * result of these multiple nsc_register_paths is that we will see a
 444  * few more attach and detach io provider calls during enable/resume
 445  * and disable/suspend of the 1-to-many whilst nsctl settles down to
 446  * using a single krdc.
 447  *
 448  * The major advantage of this scheme is that nsctl sorts out all the
 449  * rdc_fd_t's so that they can only point to krdc's that are currently
 450  * active.
 451  */
 452 int
 453 rdc_intercept(rdc_k_info_t *krdc)
 454 {
 455         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 456         char *pathname;
 457         char *bitmap;
 458 
 459         if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
 460                 pathname = urdc->primary.file;
 461                 bitmap = urdc->primary.bitmap;
 462         } else {
 463                 pathname = urdc->secondary.file;
 464                 bitmap = urdc->secondary.bitmap;
 465         }
 466 
 467         if (!krdc->b_tok)
 468                 krdc->b_tok = nsc_register_path(bitmap, NSC_CACHE | NSC_DEVICE,
 469                     _rdc_io_hc);
 470 
 471         if (!krdc->c_tok)
 472                 krdc->c_tok = nsc_register_path(pathname, NSC_CACHE,
 473                     _rdc_io_hc);
 474 
 475         if (!krdc->r_tok)
 476                 krdc->r_tok = nsc_register_path(pathname, NSC_DEVICE,
 477                     _rdc_io_hr);
 478 
 479         if (!krdc->c_tok || !krdc->r_tok) {
 480                 (void) rdc_unintercept(krdc);
 481                 return (ENXIO);
 482         }
 483 
 484         return (0);
 485 }
 486 
 487 
 488 static void
 489 wait_unregistering(rdc_k_info_t *krdc)
 490 {
 491         while (krdc->group->unregistering > 0)
 492                 (void) cv_wait_sig(&krdc->group->unregistercv, &rdc_conf_lock);
 493 }
 494 
 495 static void
 496 set_unregistering(rdc_k_info_t *krdc)
 497 {
 498         wait_unregistering(krdc);
 499 
 500         krdc->group->unregistering++;
 501 }
 502 
 503 static void
 504 wakeup_unregistering(rdc_k_info_t *krdc)
 505 {
 506         if (krdc->group->unregistering <= 0)
 507                 return;
 508 
 509         krdc->group->unregistering--;
 510         cv_broadcast(&krdc->group->unregistercv);
 511 }
 512 
 513 
 514 /*
 515  * rdc_unintercept
 516  *
 517  * Unregister for IO on this device.
 518  *
 519  * See comments above rdc_intercept.
 520  */
 521 int
 522 rdc_unintercept(rdc_k_info_t *krdc)
 523 {
 524         int err = 0;
 525         int rc;
 526         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 527 
 528         mutex_enter(&rdc_conf_lock);
 529         set_unregistering(krdc);
 530         krdc->type_flag |= RDC_UNREGISTER;
 531         mutex_exit(&rdc_conf_lock);
 532 
 533         if (krdc->r_tok) {
 534                 rc = nsc_unregister_path(krdc->r_tok, 0);
 535                 if (rc) {
 536                         cmn_err(CE_WARN, "!rdc: unregister rawfd %d", rc);
 537                         err = rc;
 538                 }
 539                 krdc->r_tok = NULL;
 540         }
 541 
 542         if (krdc->c_tok) {
 543                 rc = nsc_unregister_path(krdc->c_tok, 0);
 544                 if (rc) {
 545                         cmn_err(CE_WARN, "!rdc: unregister cachefd %d", rc);
 546                         if (!err)
 547                                 err = rc;
 548                 }
 549                 krdc->c_tok = NULL;
 550         }
 551 
 552         if (krdc->b_tok) {
 553                 rc = nsc_unregister_path(krdc->b_tok, 0);
 554                 if (rc) {
 555                         cmn_err(CE_WARN, "!rdc: unregister bitmap %d", rc);
 556                         err = rc;
 557                 }
 558                 krdc->b_tok = NULL;
 559         }
 560 
 561         rdc_group_enter(krdc);
 562 
 563         /* Wait for all necessary _rdc_close() calls to complete */
 564         while ((krdc->c_ref + krdc->r_ref + krdc->b_ref) != 0) {
 565                 krdc->closing++;
 566                 cv_wait(&krdc->closingcv, &krdc->group->lock);
 567                 krdc->closing--;
 568         }
 569 
 570         rdc_clr_flags(urdc, RDC_ENABLED);
 571         rdc_group_exit(krdc);
 572 
 573 
 574         /*
 575          * Check there are no outstanding writes in progress.
 576          * This can happen when a set is being disabled which
 577          * is one of the 'one_to_many' chain, that did not
 578          * intercept the original write call.
 579          */
 580 
 581         for (;;) {
 582                 rdc_group_enter(krdc);
 583                 if (krdc->aux_state & RDC_AUXWRITE) {
 584                         rdc_group_exit(krdc);
 585                         /*
 586                          * This doesn't happen very often,
 587                          * just delay a bit and re-look.
 588                          */
 589                         delay(50);
 590                 } else {
 591                         rdc_group_exit(krdc);
 592                         break;
 593                 }
 594         }
 595 
 596         mutex_enter(&rdc_conf_lock);
 597         krdc->type_flag &= ~RDC_UNREGISTER;
 598         wakeup_unregistering(krdc);
 599         mutex_exit(&rdc_conf_lock);
 600 
 601         return (err);
 602 }
 603 
 604 
 605 /*
 606  * _rdc_rlse_d
 607  *      Internal version of _rdc_rlse_devs(), only concerned with the
 608  *      data device, not the bitmap.
 609  */
 610 
 611 static void
 612 _rdc_rlse_d(rdc_k_info_t *krdc, int devs)
 613 {
 614         _rdc_info_dev_t *cip;
 615         _rdc_info_dev_t *rip;
 616         int raw = (devs & RDC_RAW);
 617 
 618         if (!krdc) {
 619                 cmn_err(CE_WARN, "!rdc: _rdc_rlse_devs null krdc");
 620                 return;
 621         }
 622 
 623         ASSERT((devs & (~RDC_BMP)) != 0);
 624 
 625         cip = &krdc->devices->id_cache_dev;
 626         rip = &krdc->devices->id_raw_dev;
 627 
 628         if (IS_RSRV(cip)) {
 629                 /* decrement count */
 630 
 631                 if (raw) {
 632                         if (cip->bi_ofailed > 0) {
 633                                 cip->bi_ofailed--;
 634                         } else if (cip->bi_orsrv > 0) {
 635                                 cip->bi_orsrv--;
 636                         }
 637                 } else {
 638                         if (cip->bi_failed > 0) {
 639                                 cip->bi_failed--;
 640                         } else if (cip->bi_rsrv > 0) {
 641                                 cip->bi_rsrv--;
 642                         }
 643                 }
 644 
 645                 /*
 646                  * reset nsc_fd ownership back link, it is only set if
 647                  * we have really done an underlying reserve, not for
 648                  * failed (faked) reserves.
 649                  */
 650 
 651                 if (cip->bi_rsrv > 0 || cip->bi_orsrv > 0) {
 652                         nsc_set_owner(cip->bi_fd, krdc->iodev);
 653                 } else {
 654                         nsc_set_owner(cip->bi_fd, NULL);
 655                 }
 656 
 657                 /* release nsc_fd */
 658 
 659                 if (!IS_RSRV(cip)) {
 660                         nsc_release(cip->bi_fd);
 661                 }
 662         } else if (IS_RSRV(rip)) {
 663                 /* decrement count */
 664 
 665                 if (raw) {
 666                         if (rip->bi_failed > 0) {
 667                                 rip->bi_failed--;
 668                         } else if (rip->bi_rsrv > 0) {
 669                                 rip->bi_rsrv--;
 670                         }
 671                 } else {
 672                         if (rip->bi_ofailed > 0) {
 673                                 rip->bi_ofailed--;
 674                         } else if (rip->bi_orsrv > 0) {
 675                                 rip->bi_orsrv--;
 676                         }
 677                 }
 678 
 679                 /*
 680                  * reset nsc_fd ownership back link, it is only set if
 681                  * we have really done an underlying reserve, not for
 682                  * failed (faked) reserves.
 683                  */
 684 
 685                 if (rip->bi_rsrv > 0 || rip->bi_orsrv > 0) {
 686                         nsc_set_owner(rip->bi_fd, krdc->iodev);
 687                 } else {
 688                         nsc_set_owner(rip->bi_fd, NULL);
 689                 }
 690 
 691                 /* release nsc_fd and any waiters */
 692 
 693                 if (!IS_RSRV(rip)) {
 694                         rip->bi_flag = 0;
 695                         nsc_release(rip->bi_fd);
 696                         cv_broadcast(&krdc->devices->id_rcv);
 697                 }
 698         } else {
 699                 cmn_err(CE_WARN, "!rdc: _rdc_rlse_devs no reserve? krdc %p",
 700                     (void *) krdc);
 701         }
 702 }
 703 
 704 /*
 705  * _rdc_rlse_devs
 706  *      Release named underlying devices and take care of setting the
 707  *      back link on the nsc_fd to the correct parent iodev.
 708  *
 709  *      NOTE: the 'devs' argument must be the same as that passed to
 710  *      the preceding _rdc_rsrv_devs call.
 711  */
 712 
 713 void
 714 _rdc_rlse_devs(rdc_k_info_t *krdc, int devs)
 715 {
 716 
 717         DTRACE_PROBE(_rdc_rlse_devs_start);
 718         mutex_enter(&krdc->devices->id_rlock);
 719 
 720         ASSERT(!(devs & RDC_CACHE));
 721 
 722         if ((devs & (~RDC_BMP)) != 0) {
 723                 _rdc_rlse_d(krdc, devs);
 724         }
 725 
 726         if ((devs & RDC_BMP) != 0) {
 727                 if (krdc->bmaprsrv > 0 && --krdc->bmaprsrv == 0) {
 728                         nsc_release(krdc->bitmapfd);
 729                 }
 730         }
 731 
 732         mutex_exit(&krdc->devices->id_rlock);
 733 
 734 }
 735 
 736 /*
 737  * _rdc_rsrv_d
 738  *      Reserve device flagged, unless its companion is already reserved,
 739  *      in that case increase the reserve on the companion.  Take care
 740  *      of setting the nsc_fd ownership back link to the correct parent
 741  *      iodev pointer.
 742  */
 743 
 744 static int
 745 _rdc_rsrv_d(int raw, _rdc_info_dev_t *rid, _rdc_info_dev_t *cid, int flag,
 746     rdc_k_info_t *krdc)
 747 {
 748         _rdc_info_dev_t *p = NULL;
 749         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 750         int other = 0;
 751         int rc;
 752 
 753 
 754 #ifdef DEBUG
 755         if ((rid->bi_rsrv < 0) ||
 756             (cid->bi_rsrv < 0) ||
 757             (rid->bi_orsrv < 0) ||
 758             (cid->bi_orsrv < 0) ||
 759             (rid->bi_failed < 0) ||
 760             (cid->bi_failed < 0) ||
 761             (rid->bi_ofailed < 0) ||
 762             (cid->bi_ofailed < 0)) {
 763                 cmn_err(CE_WARN,
 764                     "!_rdc_rsrv_d: negative counts (rsrv %d %d orsrv %d %d)",
 765                     rid->bi_rsrv, cid->bi_rsrv,
 766                     rid->bi_orsrv, cid->bi_orsrv);
 767                 cmn_err(CE_WARN,
 768                     "!_rdc_rsrv_d: negative counts (fail %d %d ofail %d %d)",
 769                     rid->bi_failed, cid->bi_failed,
 770                     rid->bi_ofailed, cid->bi_ofailed);
 771                 cmn_err(CE_PANIC, "_rdc_rsrv_d: negative counts (krdc %p)",
 772                     (void *) krdc);
 773         }
 774 #endif
 775 
 776         /*
 777          * If user wants to do a cache reserve and it's already
 778          * raw reserved internally, we need to do a real nsc_reserve, so wait
 779          * until the release has been done.
 780          */
 781         if (IS_RSRV(rid) && (flag == RDC_EXTERNAL) &&
 782             (raw == 0) && (rid->bi_flag != RDC_EXTERNAL)) {
 783                 krdc->devices->id_release++;
 784                 while (IS_RSRV(rid))
 785                         cv_wait(&krdc->devices->id_rcv,
 786                             &krdc->devices->id_rlock);
 787                 krdc->devices->id_release--;
 788         }
 789 
 790         /* select underlying device to use */
 791 
 792         if (IS_RSRV(rid)) {
 793                 p = rid;
 794                 if (!raw) {
 795                         other = 1;
 796                 }
 797         } else if (IS_RSRV(cid)) {
 798                 p = cid;
 799                 if (raw) {
 800                         other = 1;
 801                 }
 802         }
 803 
 804         /* just increment count and return if already reserved */
 805 
 806         if (p && !RFAILED(p)) {
 807                 if (other) {
 808                         p->bi_orsrv++;
 809                 } else {
 810                         p->bi_rsrv++;
 811                 }
 812 
 813                 /* set nsc_fd ownership back link */
 814                 nsc_set_owner(p->bi_fd, krdc->iodev);
 815                 return (0);
 816         }
 817 
 818         /* attempt reserve */
 819 
 820         if (!p) {
 821                 p = raw ? rid : cid;
 822         }
 823 
 824         if (!p->bi_fd) {
 825                 /* rpc server raced with rdc_dev_close() */
 826                 return (EIO);
 827         }
 828         if ((rc = nsc_reserve(p->bi_fd, 0)) == 0) {
 829                 /*
 830                  * convert failed counts into reserved counts, and add
 831                  * in this reserve.
 832                  */
 833 
 834                 p->bi_orsrv = p->bi_ofailed;
 835                 p->bi_rsrv = p->bi_failed;
 836 
 837                 if (other) {
 838                         p->bi_orsrv++;
 839                 } else {
 840                         p->bi_rsrv++;
 841                 }
 842 
 843                 p->bi_ofailed = 0;
 844                 p->bi_failed = 0;
 845 
 846                 /* set nsc_fd ownership back link */
 847 
 848                 nsc_set_owner(p->bi_fd, krdc->iodev);
 849         } else if (rc != EINTR) {
 850                 /*
 851                  * If this is the master, and the secondary is not
 852                  * failed, then just fake this external reserve so that
 853                  * we can do remote io to the secondary and continue to
 854                  * provide service to the client.
 855                  *
 856                  * Subsequent calls to _rdc_rsrv_d() will re-try the
 857                  * nsc_reserve() until it succeeds.
 858                  */
 859 
 860                 if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
 861                     !(rdc_get_vflags(urdc) & RDC_LOGGING) &&
 862                     !((rdc_get_vflags(urdc) & RDC_SLAVE) &&
 863                     (rdc_get_vflags(urdc) & RDC_SYNCING))) {
 864                         if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
 865                                 rdc_many_enter(krdc);
 866                                 /* Primary, so reverse sync needed */
 867                                 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
 868                                 rdc_set_flags_log(urdc, RDC_VOL_FAILED,
 869                                     "nsc_reserve failed");
 870                                 rdc_many_exit(krdc);
 871                                 rc = -1;
 872 #ifdef DEBUG
 873                                 cmn_err(CE_NOTE, "!nsc_reserve failed "
 874                                     "with rc == %d\n", rc);
 875 #endif
 876                         } else {
 877                                 rc = 0;
 878                         }
 879 
 880                         if (other) {
 881                                 p->bi_ofailed++;
 882                         } else {
 883                                 p->bi_failed++;
 884                         }
 885 
 886                         if (krdc->maxfbas == 0) {
 887                                 /*
 888                                  * fake a maxfbas value for remote i/o,
 889                                  * this will get reset when the next
 890                                  * successful reserve happens as part
 891                                  * of the rdc_attach_fd() callback.
 892                                  */
 893                                 krdc->maxfbas = 128;
 894                         }
 895                 }
 896         }
 897 
 898         if (rc == 0 && raw) {
 899                 p->bi_flag = flag;
 900         }
 901 
 902 
 903         return (rc);
 904 }
 905 
 906 /*
 907  * _rdc_rsrv_devs
 908  *      Reserve named underlying devices.
 909  *
 910  */
 911 
 912 int
 913 _rdc_rsrv_devs(rdc_k_info_t *krdc, int devs, int flag)
 914 {
 915         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 916         int write = 0;
 917         int rc = 0;
 918         int got = 0;
 919 
 920         if (!krdc) {
 921                 return (EINVAL);
 922         }
 923 
 924         ASSERT(!(devs & RDC_CACHE));
 925 
 926         mutex_enter(&krdc->devices->id_rlock);
 927 
 928         if ((devs & (~RDC_BMP)) != 0) {
 929                 if ((rc = _rdc_rsrv_d((devs & RDC_CACHE) == 0,
 930                     &krdc->devices->id_raw_dev, &krdc->devices->id_cache_dev,
 931                     flag, krdc)) != 0) {
 932                         if (rc == -1) {
 933                                 /*
 934                                  * we need to call rdc_write_state()
 935                                  * after we drop the mutex
 936                                  */
 937                                 write = 1;
 938                                 rc = 0;
 939                         } else {
 940                                 cmn_err(CE_WARN,
 941                                     "!rdc: nsc_reserve(%s) failed %d\n",
 942                                     nsc_pathname(krdc->c_fd), rc);
 943                         }
 944                 } else {
 945                         got |= (devs & (~RDC_BMP));
 946                 }
 947         }
 948 
 949         if (rc == 0 && (devs & RDC_BMP) != 0) {
 950                 if (krdc->bitmapfd == NULL)
 951                         rc = EIO;
 952                 else if ((krdc->bmaprsrv == 0) &&
 953                     (rc = nsc_reserve(krdc->bitmapfd, 0)) != 0) {
 954                         cmn_err(CE_WARN, "!rdc: nsc_reserve(%s) failed %d\n",
 955                             nsc_pathname(krdc->bitmapfd), rc);
 956                 } else {
 957                         krdc->bmaprsrv++;
 958                         got |= RDC_BMP;
 959                 }
 960                 if (!RDC_SUCCESS(rc)) {
 961                         /* Undo any previous reserve */
 962                         if (got != 0)
 963                                 _rdc_rlse_d(krdc, got);
 964                 }
 965         }
 966 
 967         mutex_exit(&krdc->devices->id_rlock);
 968 
 969         if (write) {
 970                 rdc_write_state(urdc);
 971         }
 972 
 973         return (rc);
 974 }
 975 
 976 
 977 /*
 978  * Read from the remote end, ensuring that if this is a many group in
 979  * slave mode that we only remote read from the secondary with the
 980  * valid data.
 981  */
 982 int
 983 _rdc_remote_read(rdc_k_info_t *krdc, nsc_buf_t *h, nsc_off_t pos,
 984     nsc_size_t len, int flag)
 985 {
 986         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
 987         rdc_k_info_t *this = krdc;      /* krdc that was requested */
 988         int rc;
 989 
 990         if (flag & NSC_RDAHEAD) {
 991                 /*
 992                  * no point in doing readahead remotely,
 993                  * just say we did it ok - the client is about to
 994                  * throw this buffer away as soon as we return.
 995                  */
 996                 return (NSC_DONE);
 997         }
 998 
 999         /*
1000          * If this is a many group with a reverse sync in progress and
1001          * this is not the slave krdc/urdc, then search for the slave
1002          * so that we can do the remote io from the correct secondary.
1003          */
1004         if ((rdc_get_mflags(urdc) & RDC_SLAVE) &&
1005             !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
1006                 rdc_many_enter(krdc);
1007                 for (krdc = krdc->many_next; krdc != this;
1008                     krdc = krdc->many_next) {
1009                         urdc = &rdc_u_info[krdc->index];
1010                         if (!IS_ENABLED(urdc))
1011                                 continue;
1012                         if (rdc_get_vflags(urdc) & RDC_SLAVE)
1013                                 break;
1014                 }
1015                 rdc_many_exit(krdc);
1016 
1017                 this = krdc;
1018         }
1019 
1020 read1:
1021         if (rdc_get_vflags(urdc) & RDC_LOGGING) {
1022                 /* cannot do remote io without the remote node! */
1023                 rc = ENETDOWN;
1024                 goto read2;
1025         }
1026 
1027 
1028         /* wait for the remote end to have the latest data */
1029 
1030         if (IS_ASYNC(urdc)) {
1031                 while (krdc->group->ra_queue.blocks != 0) {
1032                         if (!krdc->group->rdc_writer)
1033                                 (void) rdc_writer(krdc->index);
1034 
1035                         (void) rdc_drain_queue(krdc->index);
1036                 }
1037         }
1038 
1039         if (krdc->io_kstats) {
1040                 mutex_enter(krdc->io_kstats->ks_lock);
1041                 kstat_runq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1042                 mutex_exit(krdc->io_kstats->ks_lock);
1043         }
1044 
1045         rc = rdc_net_read(krdc->index, krdc->remote_index, h, pos, len);
1046 
1047         if (krdc->io_kstats) {
1048                 mutex_enter(krdc->io_kstats->ks_lock);
1049                 kstat_runq_exit(KSTAT_IO_PTR(krdc->io_kstats));
1050                 mutex_exit(krdc->io_kstats->ks_lock);
1051         }
1052 
1053         /* If read error keep trying every secondary until no more */
1054 read2:
1055         if (!RDC_SUCCESS(rc) && IS_MANY(krdc) &&
1056             !(rdc_get_mflags(urdc) & RDC_SLAVE)) {
1057                 rdc_many_enter(krdc);
1058                 for (krdc = krdc->many_next; krdc != this;
1059                     krdc = krdc->many_next) {
1060                         urdc = &rdc_u_info[krdc->index];
1061                         if (!IS_ENABLED(urdc))
1062                                 continue;
1063                         rdc_many_exit(krdc);
1064                         goto read1;
1065                 }
1066                 rdc_many_exit(krdc);
1067         }
1068 
1069         return (rc);
1070 }
1071 
1072 
1073 /*
1074  * _rdc_alloc_buf
1075  *      Allocate a buffer of data
1076  *
1077  * Calling/Exit State:
1078  *      Returns NSC_DONE or NSC_HIT for success, NSC_PENDING for async
1079  *      I/O, > 0 is an error code.
1080  *
1081  * Description:
1082  */
1083 int rdcbufs = 0;
1084 
1085 static int
1086 _rdc_alloc_buf(rdc_fd_t *rfd, nsc_off_t pos, nsc_size_t len, int flag,
1087     rdc_buf_t **ptr)
1088 {
1089         rdc_k_info_t *krdc = rfd->rdc_info;
1090         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1091         nsc_vec_t *vec = NULL;
1092         rdc_buf_t *h;
1093         size_t size;
1094         int ioflag;
1095         int rc = 0;
1096 
1097         if (RDC_IS_BMP(rfd) || RDC_IS_QUE(rfd))
1098                 return (EIO);
1099 
1100         if (len == 0)
1101                 return (EINVAL);
1102 
1103         if (flag & NSC_WRBUF) {
1104 
1105                 if (!(rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1106                     !(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1107                         /*
1108                          * Forbid writes to secondary unless logging.
1109                          */
1110                         return (EIO);
1111                 }
1112         }
1113 
1114         if (!(rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1115             (rdc_get_vflags(urdc) & RDC_SYNC_NEEDED)) {
1116                 /*
1117                  * Forbid any io to secondary if it needs a sync.
1118                  */
1119                 return (EIO);
1120         }
1121 
1122         if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1123             (rdc_get_vflags(urdc) & RDC_RSYNC_NEEDED) &&
1124             !(rdc_get_vflags(urdc) & RDC_VOL_FAILED) &&
1125             !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
1126                 /*
1127                  * Forbid any io to primary if it needs a reverse sync
1128                  * and is not actively syncing.
1129                  */
1130                 return (EIO);
1131         }
1132 
1133         /* Bounds checking */
1134         ASSERT(urdc->volume_size != 0);
1135         if (pos + len > urdc->volume_size) {
1136 #ifdef DEBUG
1137                 cmn_err(CE_NOTE,
1138                     "!rdc: Attempt to access beyond end of rdc volume");
1139 #endif
1140                 return (EIO);
1141         }
1142 
1143         h = *ptr;
1144         if (h == NULL) {
1145                 /* should never happen (nsctl does this for us) */
1146 #ifdef DEBUG
1147                 cmn_err(CE_WARN, "!_rdc_alloc_buf entered without buffer!");
1148 #endif
1149                 h = (rdc_buf_t *)_rdc_alloc_handle(NULL, NULL, NULL, rfd);
1150                 if (h == NULL)
1151                         return (ENOMEM);
1152 
1153                 h->rdc_bufh.sb_flag &= ~NSC_HALLOCATED;
1154                 *ptr = h;
1155         }
1156 
1157         if (flag & NSC_NOBLOCK) {
1158                 cmn_err(CE_WARN,
1159                     "!_rdc_alloc_buf: removing unsupported NSC_NOBLOCK flag");
1160                 flag &= ~(NSC_NOBLOCK);
1161         }
1162 
1163         h->rdc_bufh.sb_error = 0;
1164         h->rdc_bufh.sb_flag |= flag;
1165         h->rdc_bufh.sb_pos = pos;
1166         h->rdc_bufh.sb_len = len;
1167         ioflag = flag;
1168 
1169         bzero(&h->rdc_sync, sizeof (h->rdc_sync));
1170         mutex_init(&h->rdc_sync.lock, NULL, MUTEX_DRIVER, NULL);
1171         cv_init(&h->rdc_sync.cv, NULL, CV_DRIVER, NULL);
1172 
1173         if (flag & NSC_WRBUF)
1174                 _rdc_async_throttle(krdc, len); /* throttle incoming io */
1175 
1176         /*
1177          * Use remote io when:
1178          * - local volume is failed
1179          * - reserve status is failed
1180          */
1181         if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED) || IS_RFAILED(krdc)) {
1182                 rc = EIO;
1183         } else {
1184                 rc = nsc_alloc_buf(RDC_U_FD(krdc), pos, len,
1185                     ioflag, &h->rdc_bufp);
1186                 if (!RDC_SUCCESS(rc)) {
1187                         rdc_many_enter(krdc);
1188                         if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
1189                                 /* Primary, so reverse sync needed */
1190                                 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
1191                         } else {
1192                                 /* Secondary, so forward sync needed */
1193                                 rdc_set_flags(urdc, RDC_SYNC_NEEDED);
1194                         }
1195                         rdc_set_flags_log(urdc, RDC_VOL_FAILED,
1196                             "nsc_alloc_buf failed");
1197                         rdc_many_exit(krdc);
1198                         rdc_write_state(urdc);
1199                 }
1200         }
1201 
1202         if (RDC_SUCCESS(rc)) {
1203                 h->rdc_bufh.sb_vec = h->rdc_bufp->sb_vec;
1204                 h->rdc_flags |= RDC_ALLOC;
1205 
1206                 /*
1207                  * If in slave and reading data, remote read on top of
1208                  * the buffer to ensure that we have the latest data.
1209                  */
1210                 if ((flag & NSC_READ) &&
1211                     (rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1212                     (rdc_get_mflags(urdc) & RDC_SLAVE)) {
1213                         rc = _rdc_remote_read(krdc, &h->rdc_bufh,
1214                             pos, len, flag);
1215                         /*
1216                          * Set NSC_MIXED so that the
1217                          * cache will throw away this buffer when we free
1218                          * it since we have combined data from multiple
1219                          * sources into a single buffer.
1220                          */
1221                         h->rdc_bufp->sb_flag |= NSC_MIXED;
1222                 }
1223         }
1224 
1225         /*
1226          * If nsc_alloc_buf above fails, or local volume is failed or
1227          * bitmap is failed or reserve, then we fill the buf from remote
1228          */
1229 
1230         if ((!RDC_SUCCESS(rc)) && (rdc_get_vflags(urdc) & RDC_PRIMARY) &&
1231             !(rdc_get_vflags(urdc) & RDC_LOGGING)) {
1232                 if (flag & NSC_NODATA) {
1233                         ASSERT(!(flag & NSC_READ));
1234                         h->rdc_flags |= RDC_REMOTE_BUF;
1235                         h->rdc_bufh.sb_vec = NULL;
1236                 } else {
1237                         size = sizeof (nsc_vec_t) * 2;
1238                         h->rdc_vsize = size + FBA_SIZE(len);
1239                         vec = kmem_zalloc(h->rdc_vsize, KM_SLEEP);
1240 
1241                         if (!vec) {
1242                                 rc = ENOMEM;
1243                                 goto error;
1244                         }
1245 
1246                         /* single flat buffer */
1247 
1248                         vec[0].sv_addr = (uchar_t *)vec + size;
1249                         vec[0].sv_len  = FBA_SIZE(len);
1250                         vec[0].sv_vme  = 0;
1251 
1252                         /* null terminator */
1253 
1254                         vec[1].sv_addr = NULL;
1255                         vec[1].sv_len  = 0;
1256                         vec[1].sv_vme  = 0;
1257 
1258                         h->rdc_bufh.sb_vec = vec;
1259                         h->rdc_flags |= RDC_REMOTE_BUF;
1260                         h->rdc_flags |= RDC_VEC_ALLOC;
1261                 }
1262 
1263                 if (flag & NSC_READ) {
1264                         rc = _rdc_remote_read(krdc, &h->rdc_bufh,
1265                             pos, len, flag);
1266                 } else {
1267                         rc = NSC_DONE;
1268                 }
1269         }
1270 error:
1271         if (!RDC_SUCCESS(rc)) {
1272                 h->rdc_bufh.sb_error = rc;
1273         }
1274 
1275         return (rc);
1276 }
1277 
1278 
1279 /*
1280  * _rdc_free_buf
1281  */
1282 
1283 static int
1284 _rdc_free_buf(rdc_buf_t *h)
1285 {
1286         int rc = 0;
1287 
1288         if (h->rdc_flags & RDC_ALLOC) {
1289                 if (h->rdc_bufp) {
1290                         rc = nsc_free_buf(h->rdc_bufp);
1291                 }
1292                 h->rdc_flags &= ~(RDC_ALLOC);
1293 
1294                 if (!RDC_SUCCESS(rc)) {
1295 #ifdef DEBUG
1296                         cmn_err(CE_WARN,
1297                             "!_rdc_free_buf(%p): nsc_free_buf(%p) returned %d",
1298                             (void *) h, (void *) h->rdc_bufp, rc);
1299 #endif
1300                         return (rc);
1301                 }
1302         }
1303 
1304         if (h->rdc_flags & (RDC_REMOTE_BUF|RDC_VEC_ALLOC)) {
1305                 if (h->rdc_flags & RDC_VEC_ALLOC) {
1306                         kmem_free(h->rdc_bufh.sb_vec, h->rdc_vsize);
1307                 }
1308                 h->rdc_flags &= ~(RDC_REMOTE_BUF|RDC_VEC_ALLOC);
1309         }
1310 
1311         if (h->rdc_anon) {
1312                 /* anon buffers still pending */
1313                 DTRACE_PROBE1(rdc_free_buf_err, aio_buf_t, h->rdc_anon);
1314         }
1315 
1316         if ((h->rdc_bufh.sb_flag & NSC_HALLOCATED) == 0) {
1317                 rc = _rdc_free_handle(h, h->rdc_fd);
1318                 if (!RDC_SUCCESS(rc)) {
1319 #ifdef DEBUG
1320                         cmn_err(CE_WARN,
1321                             "!_rdc_free_buf(%p): _rdc_free_handle returned %d",
1322                             (void *) h, rc);
1323 #endif
1324                         return (rc);
1325                 }
1326         } else {
1327                 h->rdc_bufh.sb_flag = NSC_HALLOCATED;
1328                 h->rdc_bufh.sb_vec = NULL;
1329                 h->rdc_bufh.sb_error = 0;
1330                 h->rdc_bufh.sb_pos = 0;
1331                 h->rdc_bufh.sb_len = 0;
1332                 h->rdc_anon = NULL;
1333                 h->rdc_vsize = 0;
1334 
1335                 cv_destroy(&h->rdc_sync.cv);
1336                 mutex_destroy(&h->rdc_sync.lock);
1337 
1338         }
1339 
1340         return (0);
1341 }
1342 
1343 
1344 /*
1345  * _rdc_open
1346  *      Open a device
1347  *
1348  * Calling/Exit State:
1349  *      Returns a token to identify the device.
1350  *
1351  * Description:
1352  *      Performs the housekeeping operations associated with an upper layer
1353  *      of the nsctl stack opening a device.
1354  */
1355 
1356 /* ARGSUSED */
1357 
1358 static int
1359 _rdc_open(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1360 {
1361         rdc_k_info_t *krdc;
1362 #ifdef DEBUG
1363         rdc_u_info_t *urdc;
1364 #endif
1365         rdc_fd_t *rfd;
1366         int raw = ((flag & NSC_CACHE) == 0);
1367         int index;
1368         int bmp = 0;
1369         int queue = 0;
1370 
1371         rfd = kmem_zalloc(sizeof (*rfd), KM_SLEEP);
1372         if (!rfd)
1373                 return (ENOMEM);
1374 
1375         /*
1376          * Take config lock to prevent a race with the
1377          * (de)configuration code.
1378          */
1379 
1380         mutex_enter(&rdc_conf_lock);
1381 
1382         index = rdc_lookup_enabled(path, 0);
1383         if (index < 0) {
1384                 index = rdc_lookup_bitmap(path);
1385                 if (index >= 0)
1386                         bmp = 1;
1387         }
1388         if (index < 0) {
1389                 index = rdc_lookup_diskq(path);
1390                 if (index >= 0)
1391                         queue = 1;
1392         }
1393         if (index < 0) {
1394                 /* not found in config */
1395                 mutex_exit(&rdc_conf_lock);
1396                 kmem_free(rfd, sizeof (*rfd));
1397                 return (ENXIO);
1398         }
1399 #ifdef DEBUG
1400         urdc = &rdc_u_info[index];
1401 #endif
1402         krdc = &rdc_k_info[index];
1403 
1404         mutex_exit(&rdc_conf_lock);
1405 
1406         rdc_group_enter(krdc);
1407 
1408         ASSERT(IS_ENABLED(urdc));
1409 
1410         if (bmp) {
1411                 krdc->b_ref++;
1412         } else if (raw) {
1413                 krdc->r_ref++;
1414         } else if (!queue) {
1415                 krdc->c_ref++;
1416         }
1417 
1418         rfd->rdc_info = krdc;
1419         if (bmp)
1420                 rfd->rdc_type = RDC_BMP;
1421         else if (queue)
1422                 rfd->rdc_type = RDC_QUE;
1423         else
1424                 rfd->rdc_oflags = flag;
1425 
1426         rdc_group_exit(krdc);
1427 
1428         *cdp = (blind_t)rfd;
1429 
1430         return (0);
1431 }
1432 
1433 static int
1434 _rdc_openc(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1435 {
1436         return (_rdc_open(path, NSC_CACHE|flag, cdp, iodev));
1437 }
1438 
1439 static int
1440 _rdc_openr(char *path, int flag, blind_t *cdp, nsc_iodev_t *iodev)
1441 {
1442         return (_rdc_open(path, NSC_DEVICE|flag, cdp, iodev));
1443 }
1444 
1445 
1446 /*
1447  * _rdc_close
1448  *      Close a device
1449  *
1450  * Calling/Exit State:
1451  *      Always succeeds - returns 0
1452  *
1453  * Description:
1454  *      Performs the housekeeping operations associated with an upper layer
1455  *      of the sd stack closing a shadowed device.
1456  */
1457 
1458 static int
1459 _rdc_close(rfd)
1460 rdc_fd_t *rfd;
1461 {
1462         rdc_k_info_t *krdc = rfd->rdc_info;
1463         int bmp = RDC_IS_BMP(rfd);
1464         int raw = RDC_IS_RAW(rfd);
1465         int queue = RDC_IS_QUE(rfd);
1466 
1467         /*
1468          * we don't keep ref counts for the queue, so skip this stuff.
1469          * we may not even have a valid krdc at this point
1470          */
1471         if (queue)
1472                 goto queue;
1473         rdc_group_enter(krdc);
1474 
1475         if (bmp) {
1476                 krdc->b_ref--;
1477         } else if (raw && !queue) {
1478                 krdc->r_ref--;
1479         } else if (!queue) {
1480                 krdc->c_ref--;
1481         }
1482 
1483         if (krdc->closing) {
1484                 cv_broadcast(&krdc->closingcv);
1485         }
1486 
1487         rdc_group_exit(krdc);
1488 queue:
1489         kmem_free(rfd, sizeof (*rfd));
1490         return (0);
1491 }
1492 
1493 /*
1494  * _rdc_alloc_handle
1495  *      Allocate a handle
1496  *
1497  */
1498 
1499 static nsc_buf_t *
1500 _rdc_alloc_handle(void (*d_cb)(), void (*r_cb)(), void (*w_cb)(), rdc_fd_t *rfd)
1501 {
1502         rdc_buf_t *h;
1503 
1504         h = kmem_zalloc(sizeof (*h), KM_SLEEP);
1505         if (!h)
1506                 return (NULL);
1507 
1508         h->rdc_bufp = nsc_alloc_handle(RDC_FD(rfd), d_cb, r_cb, w_cb);
1509         if (!h->rdc_bufp) {
1510                 if (!IS_RFAILED(rfd->rdc_info)) {
1511                         /*
1512                          * This is a real failure from the io provider below.
1513                          */
1514                         kmem_free(h, sizeof (*h));
1515                         return (NULL);
1516                 } else {
1517                         /* EMPTY */
1518                         /*
1519                          * This is just a failed primary device where
1520                          * we can do remote io to the secondary.
1521                          */
1522                 }
1523         }
1524 
1525         h->rdc_bufh.sb_flag = NSC_HALLOCATED;
1526         h->rdc_fd = rfd;
1527         mutex_init(&h->aio_lock, NULL, MUTEX_DRIVER, NULL);
1528 
1529         return (&h->rdc_bufh);
1530 }
1531 
1532 
1533 /*
1534  * _rdc_free_handle
1535  *      Free a handle
1536  *
1537  */
1538 
1539 /* ARGSUSED */
1540 static int
1541 _rdc_free_handle(rdc_buf_t *h, rdc_fd_t *rfd)
1542 {
1543         int rc;
1544 
1545         mutex_destroy(&h->aio_lock);
1546         if (h->rdc_bufp) {
1547                 rc = nsc_free_handle(h->rdc_bufp);
1548                 if (!RDC_SUCCESS(rc))
1549                         return (rc);
1550         }
1551         kmem_free(h, sizeof (rdc_buf_t));
1552         return (0);
1553 }
1554 
1555 
1556 /*
1557  * _rdc_attach
1558  *      Attach
1559  *
1560  * Calling/Exit State:
1561  *      Returns 0 for success, errno on failure.
1562  *
1563  * Description:
1564  */
1565 
1566 static int
1567 _rdc_attach(rdc_fd_t *rfd, nsc_iodev_t *iodev)
1568 {
1569         rdc_k_info_t *krdc;
1570         int raw = RDC_IS_RAW(rfd);
1571         int rc;
1572 
1573         if ((RDC_IS_BMP(rfd)) || RDC_IS_QUE(rfd))
1574                 return (EINVAL);
1575 
1576         krdc = rfd->rdc_info;
1577         if (krdc == NULL)
1578                 return (EINVAL);
1579 
1580         mutex_enter(&krdc->devices->id_rlock);
1581         krdc->iodev = iodev;
1582         mutex_exit(&krdc->devices->id_rlock);
1583 
1584         rc = _rdc_rsrv_devs(krdc, (raw ? RDC_RAW : RDC_CACHE), RDC_EXTERNAL);
1585         return (rc);
1586 }
1587 
1588 
1589 /*
1590  * _rdc_detach
1591  *      Detach
1592  *
1593  * Calling/Exit State:
1594  *      Returns 0 for success, always succeeds
1595  *
1596  * Description:
1597  */
1598 
1599 static int
1600 _rdc_detach(rdc_fd_t *rfd, nsc_iodev_t *iodev)
1601 {
1602         rdc_k_info_t *krdc = rfd->rdc_info;
1603         int raw = RDC_IS_RAW(rfd);
1604 
1605         /*
1606          * Flush the async queue if necessary.
1607          */
1608 
1609         if (IS_ASYNC(&rdc_u_info[krdc->index]) && !RDC_IS_DISKQ(krdc->group)) {
1610                 int tries = 1;
1611 
1612                 while (krdc->group->ra_queue.blocks != 0 && tries--) {
1613                         if (!krdc->group->rdc_writer)
1614                                 (void) rdc_writer(krdc->index);
1615 
1616                         (void) rdc_drain_queue(krdc->index);
1617                 }
1618 
1619                 /* force disgard of possibly blocked flusher threads */
1620                 if (rdc_drain_queue(krdc->index) != 0) {
1621 #ifdef DEBUG
1622                         net_queue *qp = &krdc->group->ra_queue;
1623 #endif
1624                         do {
1625                                 mutex_enter(&krdc->group->ra_queue.net_qlock);
1626                                 krdc->group->asyncdis = 1;
1627                                 cv_broadcast(&krdc->group->asyncqcv);
1628                                 mutex_exit(&krdc->group->ra_queue.net_qlock);
1629                                 cmn_err(CE_WARN,
1630                                     "!RDC: async I/O pending and not drained "
1631                                     "for %s during detach",
1632                                     rdc_u_info[krdc->index].primary.file);
1633 #ifdef DEBUG
1634                                 cmn_err(CE_WARN,
1635                                     "!nitems: %" NSC_SZFMT " nblocks: %"
1636                                     NSC_SZFMT " head: 0x%p tail: 0x%p",
1637                                     qp->nitems, qp->blocks,
1638                                     (void *)qp->net_qhead,
1639                                     (void *)qp->net_qtail);
1640 #endif
1641                         } while (krdc->group->rdc_thrnum > 0);
1642                 }
1643         }
1644 
1645         mutex_enter(&krdc->devices->id_rlock);
1646         if (krdc->iodev != iodev)
1647                 cmn_err(CE_WARN, "!_rdc_detach: iodev mismatch %p : %p",
1648                     (void *) krdc->iodev, (void *) iodev);
1649 
1650         krdc->iodev = NULL;
1651         mutex_exit(&krdc->devices->id_rlock);
1652 
1653         _rdc_rlse_devs(krdc, (raw ? RDC_RAW : RDC_CACHE));
1654 
1655         return (0);
1656 }
1657 
1658 /*
1659  * _rdc_get_pinned
1660  *
1661  * only affects local node.
1662  */
1663 
1664 static int
1665 _rdc_get_pinned(rdc_fd_t *rfd)
1666 {
1667         return (nsc_get_pinned(RDC_FD(rfd)));
1668 }
1669 
1670 /*
1671  * _rdc_discard_pinned
1672  *
1673  * only affects local node.
1674  */
1675 
1676 static int
1677 _rdc_discard_pinned(rdc_fd_t *rfd, nsc_off_t pos, nsc_size_t len)
1678 {
1679         return (nsc_discard_pinned(RDC_FD(rfd), pos, len));
1680 }
1681 
1682 /*
1683  * _rdc_partsize
1684  *
1685  * only affects the local node.
1686  */
1687 
1688 static int
1689 _rdc_partsize(rdc_fd_t *rfd, nsc_size_t *ptr)
1690 {
1691         rdc_u_info_t *urdc;
1692 
1693         urdc = &rdc_u_info[rfd->rdc_info->index];
1694         /* Always return saved size */
1695         ASSERT(urdc->volume_size != 0);
1696         *ptr = urdc->volume_size;
1697         return (0);
1698 }
1699 
1700 /*
1701  * _rdc_maxfbas
1702  *
1703  * only affects local node
1704  */
1705 
1706 /* ARGSUSED */
1707 static int
1708 _rdc_maxfbas(rdc_fd_t *rfd, int flag, nsc_size_t *ptr)
1709 {
1710         rdc_k_info_t *krdc = rfd->rdc_info;
1711         int raw = RDC_IS_RAW(rfd);
1712         int rtype = raw ? RDC_RAW : RDC_CACHE;
1713         int rc = 0;
1714 
1715         if (krdc == NULL)
1716                 return (EINVAL);
1717         if (flag == NSC_RDAHEAD || flag == NSC_CACHEBLK) {
1718                 rc = _rdc_rsrv_devs(krdc, rtype, RDC_INTERNAL);
1719                 if (rc == 0) {
1720                         rc = nsc_maxfbas(RDC_U_FD(krdc), flag, ptr);
1721                         _rdc_rlse_devs(krdc, rtype);
1722                 }
1723         } else {
1724                 /* Always return saved size */
1725                 ASSERT(krdc->maxfbas != 0);
1726                 *ptr = krdc->maxfbas - 1;
1727         }
1728 
1729         return (rc);
1730 }
1731 
1732 /* ARGSUSED */
1733 static int
1734 _rdc_control(rdc_fd_t *rfd, int cmd, void *ptr, int len)
1735 {
1736         return (nsc_control(RDC_FD(rfd),  cmd, ptr, len));
1737 }
1738 
1739 /*
1740  * _rdc_attach_fd
1741  *
1742  * called by nsctl as part of nsc_reserve() processing when one of
1743  * SNDR's underlying file descriptors becomes available and metadata
1744  * should be re-acquired.
1745  */
1746 static int
1747 _rdc_attach_fd(blind_t arg)
1748 {
1749         _rdc_info_dev_t *dip = (_rdc_info_dev_t *)arg;
1750         rdc_k_info_t *krdc;
1751         rdc_u_info_t *urdc;
1752         nsc_size_t maxfbas, partsize;
1753         int rc;
1754 
1755         krdc = dip->bi_krdc;
1756         urdc = &rdc_u_info[krdc->index];
1757 
1758         if ((rc = nsc_partsize(dip->bi_fd, &partsize)) != 0) {
1759                 cmn_err(CE_WARN,
1760                     "!SNDR: cannot get volume size of %s, error %d",
1761                     nsc_pathname(dip->bi_fd), rc);
1762         } else if (urdc->volume_size == 0 && partsize > 0) {
1763                 /* set volume size for the first time */
1764                 urdc->volume_size = partsize;
1765         } else if (urdc->volume_size != partsize) {
1766                 /*
1767                  * SNDR cannot yet cope with a volume being resized,
1768                  * so fail it.
1769                  */
1770                 if (!(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
1771                         rdc_many_enter(krdc);
1772                         if (rdc_get_vflags(urdc) & RDC_PRIMARY)
1773                                 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
1774                         else
1775                                 rdc_set_mflags(urdc, RDC_SYNC_NEEDED);
1776                         rdc_set_flags_log(urdc, RDC_VOL_FAILED,
1777                             "volume resized");
1778                         rdc_many_exit(krdc);
1779                         rdc_write_state(urdc);
1780                 }
1781 
1782                 cmn_err(CE_WARN,
1783                     "!SNDR: %s changed size from %" NSC_SZFMT " to %" NSC_SZFMT,
1784                     nsc_pathname(dip->bi_fd), urdc->volume_size, partsize);
1785         }
1786 
1787         if ((rc = nsc_maxfbas(dip->bi_fd, 0, &maxfbas)) != 0) {
1788                 cmn_err(CE_WARN,
1789                     "!SNDR: cannot get max transfer size for %s, error %d",
1790                     nsc_pathname(dip->bi_fd), rc);
1791         } else if (maxfbas > 0) {
1792                 krdc->maxfbas = min(RDC_MAX_MAXFBAS, maxfbas);
1793         }
1794 
1795         return (0);
1796 }
1797 
1798 
1799 /*
1800  * _rdc_pinned
1801  *
1802  * only affects local node
1803  */
1804 
1805 static void
1806 _rdc_pinned(_rdc_info_dev_t *dip, nsc_off_t pos, nsc_size_t len)
1807 {
1808         nsc_pinned_data(dip->bi_krdc->iodev, pos, len);
1809 }
1810 
1811 
1812 /*
1813  * _rdc_unpinned
1814  *
1815  * only affects local node.
1816  */
1817 
1818 static void
1819 _rdc_unpinned(_rdc_info_dev_t *dip, nsc_off_t pos, nsc_size_t len)
1820 {
1821         nsc_unpinned_data(dip->bi_krdc->iodev, pos, len);
1822 }
1823 
1824 
1825 /*
1826  * _rdc_read
1827  *
1828  * read the specified data into the buffer - go remote if local down,
1829  * or the remote end has more recent data because an reverse sync is
1830  * in progress.
1831  */
1832 
1833 static int
1834 _rdc_read(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
1835 {
1836         rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
1837         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1838         int remote = (RDC_REMOTE(h) || (rdc_get_mflags(urdc) & RDC_SLAVE));
1839         int rc1, rc2;
1840 
1841         rc1 = rc2 = 0;
1842 
1843         if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
1844                 cmn_err(CE_WARN,
1845                     "!_rdc_read: bounds check: io(handle) pos %" NSC_XSZFMT
1846                     "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
1847                     pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
1848                 h->rdc_bufh.sb_error = EINVAL;
1849                 return (h->rdc_bufh.sb_error);
1850         }
1851 
1852         if (flag & NSC_NOBLOCK) {
1853                 cmn_err(CE_WARN,
1854                     "!_rdc_read: removing unsupported NSC_NOBLOCK flag");
1855                 flag &= ~(NSC_NOBLOCK);
1856         }
1857 
1858 
1859         if (!remote) {
1860                 rc1 = nsc_read(h->rdc_bufp, pos, len, flag);
1861         }
1862 
1863         if (remote || !RDC_SUCCESS(rc1)) {
1864                 rc2 = _rdc_remote_read(krdc, &h->rdc_bufh, pos, len, flag);
1865         }
1866 
1867         if (remote && !RDC_SUCCESS(rc2))
1868                 h->rdc_bufh.sb_error = rc2;
1869         else if (!RDC_SUCCESS(rc1) && !RDC_SUCCESS(rc2))
1870                 h->rdc_bufh.sb_error = rc1;
1871 
1872         return (h->rdc_bufh.sb_error);
1873 }
1874 
1875 
1876 static int
1877 _rdc_remote_write(rdc_k_info_t *krdc, rdc_buf_t *h, nsc_buf_t *nsc_h,
1878     nsc_off_t pos, nsc_size_t len, int flag, uint_t bitmask)
1879 {
1880         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1881         int rc = 0;
1882         nsc_size_t plen, syncblockpos;
1883         aio_buf_t *anon = NULL;
1884 
1885         if (!(rdc_get_vflags(urdc) & RDC_PRIMARY))
1886                 return (EINVAL);
1887 
1888         if ((rdc_get_vflags(urdc) & RDC_LOGGING) &&
1889             (!IS_STATE(urdc, RDC_QUEUING))) {
1890                 goto done;
1891         }
1892 
1893         /*
1894          * this check for RDC_SYNCING may seem redundant, but there is a window
1895          * in rdc_sync, where an async set has not yet been transformed into a
1896          * sync set.
1897          */
1898         if ((!IS_ASYNC(urdc) || IS_STATE(urdc, RDC_SYNCING)) ||
1899             RDC_REMOTE(h) ||
1900             krdc->group->synccount > 0 ||
1901             (rdc_get_vflags(urdc) & RDC_SLAVE) ||
1902             (rdc_get_vflags(urdc) & RDC_VOL_FAILED) ||
1903             (rdc_get_vflags(urdc) & RDC_BMP_FAILED)) {
1904 
1905                 /* sync mode, or remote io mode, or local device is dead */
1906                 rc = rdc_net_write(krdc->index, krdc->remote_index,
1907                     nsc_h, pos, len, RDC_NOSEQ, RDC_NOQUE, NULL);
1908 
1909                 if ((rc == 0) &&
1910                     !(rdc_get_vflags(urdc) & RDC_BMP_FAILED) &&
1911                     !(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
1912                         if (IS_STATE(urdc, RDC_SYNCING) &&
1913                             !IS_STATE(urdc, RDC_FULL) ||
1914                             !IS_STATE(urdc, RDC_SLAVE)) {
1915                                 mutex_enter(&krdc->syncbitmutex);
1916 
1917                                 syncblockpos = LOG_TO_FBA_NUM(krdc->syncbitpos);
1918 
1919                                 DTRACE_PROBE4(rdc_remote_write,
1920                                     nsc_off_t, krdc->syncbitpos,
1921                                     nsc_off_t, syncblockpos,
1922                                     nsc_off_t, pos,
1923                                     nsc_size_t, len);
1924 
1925                                 /*
1926                                  * If the current I/O's position plus length is
1927                                  * greater then the sync block position, only
1928                                  * clear those blocks upto sync block position
1929                                  */
1930                                 if (pos < syncblockpos) {
1931                                         if ((pos + len) > syncblockpos)
1932                                                 plen = syncblockpos - pos;
1933                                         else
1934                                                 plen = len;
1935                                         RDC_CLR_BITMAP(krdc, pos, plen, bitmask,
1936                                             RDC_BIT_BUMP);
1937                                 }
1938                                 mutex_exit(&krdc->syncbitmutex);
1939                         } else {
1940                                 RDC_CLR_BITMAP(krdc, pos, len, bitmask,
1941                                     RDC_BIT_BUMP);
1942                         }
1943                 } else if (rc != 0) {
1944                         rdc_group_enter(krdc);
1945                         rdc_set_flags_log(urdc, RDC_LOGGING,
1946                             "net write failed");
1947                         rdc_write_state(urdc);
1948                         if (rdc_get_vflags(urdc) & RDC_SYNCING)
1949                                 krdc->disk_status = 1;
1950                         rdc_group_exit(krdc);
1951                 }
1952         } else if (!IS_STATE(urdc, RDC_SYNCING)) {
1953                 DTRACE_PROBE1(async_enque_start, rdc_buf_t *, h);
1954 
1955                 ASSERT(krdc->group->synccount == 0);
1956                 /* async mode */
1957                 if ((h == NULL) || ((h->rdc_flags & RDC_ASYNC_VEC) == 0)) {
1958 
1959                         rc = _rdc_enqueue_write(krdc, pos, len, flag, NULL);
1960 
1961                 } else {
1962                         anon = rdc_aio_buf_get(h, krdc->index);
1963                         if (anon == NULL) {
1964 #ifdef DEBUG
1965                                 cmn_err(CE_WARN,
1966                                     "!enqueue write failed for handle %p",
1967                                     (void *) h);
1968 #endif
1969                                 return (EINVAL);
1970                         }
1971                         rc = _rdc_enqueue_write(krdc, pos, len, flag,
1972                             anon->rdc_abufp);
1973 
1974                         /*
1975                          * get rid of the aio_buf_t now, as this
1976                          * may not be the set that this rdc_buf
1977                          * was allocated on, we are done with it anyways
1978                          * enqueuing code frees the nsc_abuf
1979                          */
1980                         rdc_aio_buf_del(h, krdc);
1981                 }
1982 
1983         } else {
1984                 ASSERT(IS_STATE(urdc, RDC_SYNCING));
1985                 ASSERT(0);
1986         }
1987 
1988 done:
1989         if ((anon == NULL) && h && (h->rdc_flags & RDC_ASYNC_VEC)) {
1990                 /*
1991                  * Toss the anonymous buffer if we have one allocated.
1992                  */
1993                 anon = rdc_aio_buf_get(h, krdc->index);
1994                 if (anon) {
1995                         (void) nsc_free_buf(anon->rdc_abufp);
1996                         rdc_aio_buf_del(h, krdc);
1997                 }
1998         }
1999 
2000         return (rc);
2001 }
2002 
2003 /*
2004  * _rdc_multi_write
2005  *
2006  * Send to multihop remote. Obeys 1 to many if present and we are crazy
2007  * enough to support it.
2008  *
2009  */
2010 int
2011 _rdc_multi_write(nsc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag,
2012     rdc_k_info_t *krdc)
2013 {
2014         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2015         rdc_k_info_t *this = krdc;      /* krdc that was requested */
2016         int rc, retval;
2017         uint_t bitmask;
2018 
2019         retval = rc = 0;
2020         if (!RDC_HANDLE_LIMITS(h, pos, len)) {
2021                 cmn_err(CE_WARN,
2022                     "!_rdc_multi_write: bounds check: io(handle) pos %"
2023                     NSC_XSZFMT "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%"
2024                     NSC_XSZFMT ")", pos, h->sb_pos, len, h->sb_len);
2025                 return (EINVAL);
2026         }
2027 
2028         /* if this is a 1 to many, set all the bits for all the sets */
2029         do {
2030                 if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2031                         (void) nsc_uncommit(h, pos, len, flag);
2032                         /* set the error, but try other sets */
2033                         retval = EIO;
2034                 }
2035                 if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
2036                         rdc_many_enter(krdc);
2037                         for (krdc = krdc->many_next; krdc != this;
2038                             krdc = krdc->many_next) {
2039                                 urdc = &rdc_u_info[krdc->index];
2040                                 if (!IS_ENABLED(urdc))
2041                                         continue;
2042                                 break;
2043                         }
2044                         rdc_many_exit(krdc);
2045                 }
2046         } while (krdc != this);
2047 
2048         urdc = &rdc_u_info[krdc->index];
2049 
2050         if (flag & NSC_NOBLOCK) {
2051                 cmn_err(CE_WARN,
2052                     "!_rdc_multi_write: removing unsupported NSC_NOBLOCK flag");
2053                 flag &= ~(NSC_NOBLOCK);
2054         }
2055 
2056 multiwrite1:
2057         if ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
2058             (!IS_STATE(urdc, RDC_LOGGING) ||
2059             (IS_STATE(urdc, RDC_LOGGING) &&
2060             IS_STATE(urdc, RDC_QUEUING)))) {
2061                 rc = _rdc_remote_write(krdc, NULL, h, pos, len, flag, bitmask);
2062         }
2063 
2064         if (!RDC_SUCCESS(rc) && retval == 0) {
2065                 retval = rc;
2066         }
2067 
2068 multiwrite2:
2069         if (IS_MANY(krdc) && (rdc_get_vflags(urdc) && RDC_PRIMARY)) {
2070                 rdc_many_enter(krdc);
2071                 for (krdc = krdc->many_next; krdc != this;
2072                     krdc = krdc->many_next) {
2073                         urdc = &rdc_u_info[krdc->index];
2074                         if (!IS_ENABLED(urdc))
2075                                 continue;
2076                         rc = 0;
2077                         rdc_many_exit(krdc);
2078 
2079                         goto multiwrite1;
2080                 }
2081                 rdc_many_exit(krdc);
2082         }
2083 
2084         return (retval);
2085 }
2086 
2087 void
2088 _rdc_diskq_enqueue_thr(rdc_aio_t *p)
2089 {
2090         rdc_thrsync_t *sync = (rdc_thrsync_t *)p->next;
2091         rdc_k_info_t *krdc = &rdc_k_info[p->index];
2092         int rc2;
2093 
2094 
2095         rc2 = rdc_diskq_enqueue(krdc, p);
2096 
2097         /*
2098          * overload flag with error return if any
2099          */
2100         if (!RDC_SUCCESS(rc2)) {
2101                 p->flag = rc2;
2102         } else {
2103                 p->flag = 0;
2104         }
2105         mutex_enter(&sync->lock);
2106         sync->complete++;
2107         cv_broadcast(&sync->cv);
2108         mutex_exit(&sync->lock);
2109 }
2110 
2111 /*
2112  * _rdc_sync_write_thr
2113  * syncronous write thread which writes to network while
2114  * local write is occuring
2115  */
2116 void
2117 _rdc_sync_write_thr(rdc_aio_t *p)
2118 {
2119         rdc_thrsync_t *sync = (rdc_thrsync_t *)p->next;
2120         rdc_buf_t *h = (rdc_buf_t *)p->handle;
2121         rdc_k_info_t *krdc = &rdc_k_info[p->index];
2122 #ifdef  DEBUG
2123         rdc_u_info_t *urdc;
2124 #endif
2125         int rc2;
2126         int bitmask;
2127 
2128         rdc_group_enter(krdc);
2129         krdc->aux_state |= RDC_AUXWRITE;
2130 #ifdef  DEBUG
2131         urdc = &rdc_u_info[krdc->index];
2132         if (!IS_ENABLED(urdc)) {
2133                 cmn_err(CE_WARN, "!rdc_sync_write_thr: set not enabled %s:%s",
2134                     urdc->secondary.file,
2135                     urdc->secondary.bitmap);
2136         }
2137 #endif
2138         rdc_group_exit(krdc);
2139         bitmask = p->iostatus;       /* overload */
2140         rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh, p->pos, p->len,
2141             p->flag, bitmask);
2142 
2143 
2144         /*
2145          * overload flag with error return if any
2146          */
2147         if (!RDC_SUCCESS(rc2)) {
2148                 p->flag = rc2;
2149         } else {
2150                 p->flag = 0;
2151         }
2152 
2153         rdc_group_enter(krdc);
2154         krdc->aux_state &= ~RDC_AUXWRITE;
2155         rdc_group_exit(krdc);
2156 
2157         mutex_enter(&sync->lock);
2158         sync->complete++;
2159         cv_broadcast(&sync->cv);
2160         mutex_exit(&sync->lock);
2161 }
2162 
2163 /*
2164  * _rdc_write
2165  *
2166  * Commit changes to the buffer locally and send remote.
2167  *
2168  * If this write is whilst the local primary volume is being synced,
2169  * then we write the remote end first to ensure that the new data
2170  * cannot be overwritten by a concurrent sync operation.
2171  */
2172 
2173 static int
2174 _rdc_write(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2175 {
2176         rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
2177         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2178         rdc_k_info_t *this;
2179         rdc_k_info_t *multi = NULL;
2180         int remote = RDC_REMOTE(h);
2181         int rc1, rc2;
2182         uint_t bitmask;
2183         int first;
2184         int rsync;
2185         int nthr;
2186         int winddown;
2187         int thrrc = 0;
2188         rdc_aio_t *bp[SNDR_MAXTHREADS];
2189         aio_buf_t *anon;
2190         nsthread_t  *tp;
2191         rdc_thrsync_t *sync = &h->rdc_sync;
2192 
2193         /* If this is the multi-hop secondary, move along to the primary */
2194         if (IS_MULTI(krdc) && !IS_PRIMARY(urdc)) {
2195                 multi = krdc;
2196                 krdc = krdc->multi_next;
2197                 urdc = &rdc_u_info[krdc->index];
2198 
2199                 if (!IS_ENABLED(urdc)) {
2200                         krdc = h->rdc_fd->rdc_info;
2201                         urdc = &rdc_u_info[krdc->index];
2202                         multi = NULL;
2203                 }
2204         }
2205         this = krdc;
2206 
2207         rsync = (IS_PRIMARY(urdc)) && (IS_SLAVE(urdc));
2208 
2209         /*
2210          * If this is a many group with a reverse sync in progress and
2211          * this is not the slave krdc/urdc, then search for the slave
2212          * so that we can do the remote io to the correct secondary
2213          * before the local io.
2214          */
2215         if (rsync && !(IS_SLAVE(urdc))) {
2216                 rdc_many_enter(krdc);
2217                 for (krdc = krdc->many_next; krdc != this;
2218                     krdc = krdc->many_next) {
2219                         urdc = &rdc_u_info[krdc->index];
2220                         if (!IS_ENABLED(urdc))
2221                                 continue;
2222                         if (rdc_get_vflags(urdc) & RDC_SLAVE)
2223                                 break;
2224                 }
2225                 rdc_many_exit(krdc);
2226 
2227                 this = krdc;
2228         }
2229 
2230         urdc = &rdc_u_info[krdc->index];
2231 
2232         rc1 = rc2 = 0;
2233         first = 1;
2234         nthr = 0;
2235         if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2236                 cmn_err(CE_WARN,
2237                     "!_rdc_write: bounds check: io(handle) pos %" NSC_XSZFMT
2238                     "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2239                     pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2240                 h->rdc_bufh.sb_error = EINVAL;
2241                 return (h->rdc_bufh.sb_error);
2242         }
2243 
2244         DTRACE_PROBE(rdc_write_bitmap_start);
2245 
2246         /* if this is a 1 to many, set all the bits for all the sets */
2247         do {
2248                 if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2249                         if (rdc_eio_nobmp) {
2250                                 (void) nsc_uncommit
2251                                     (h->rdc_bufp, pos, len, flag);
2252                                 /* set the error, but try the other sets */
2253                                 h->rdc_bufh.sb_error = EIO;
2254                         }
2255                 }
2256 
2257                 if (IS_MANY(krdc) && IS_STATE(urdc, RDC_PRIMARY)) {
2258                         rdc_many_enter(krdc);
2259                         for (krdc = krdc->many_next; krdc != this;
2260                             krdc = krdc->many_next) {
2261                                 urdc = &rdc_u_info[krdc->index];
2262                                 if (!IS_ENABLED(urdc))
2263                                         continue;
2264                                 break;
2265                         }
2266                         rdc_many_exit(krdc);
2267                 }
2268 
2269         } while (krdc != this);
2270 
2271         urdc = &rdc_u_info[krdc->index];
2272 
2273         DTRACE_PROBE(rdc_write_bitmap_end);
2274 
2275 write1:
2276         /* just in case we switch mode during write */
2277         if (IS_ASYNC(urdc) && (!IS_STATE(urdc, RDC_SYNCING)) &&
2278             (!IS_STATE(urdc, RDC_LOGGING) ||
2279             IS_STATE(urdc, RDC_QUEUING))) {
2280                 h->rdc_flags |= RDC_ASYNC_BUF;
2281         }
2282         if (BUF_IS_ASYNC(h)) {
2283                 /*
2284                  * We are async mode
2285                  */
2286                 aio_buf_t *p;
2287                 DTRACE_PROBE(rdc_write_async_start);
2288 
2289                 if ((krdc->type_flag & RDC_DISABLEPEND) ||
2290                     ((IS_STATE(urdc, RDC_LOGGING) &&
2291                     !IS_STATE(urdc, RDC_QUEUING)))) {
2292                         goto localwrite;
2293                 }
2294                 if (IS_STATE(urdc, RDC_VOL_FAILED)) {
2295                         /*
2296                          * overload remote as we don't want to do local
2297                          * IO later. forge ahead with async
2298                          */
2299                         remote++;
2300                 }
2301                 if ((IS_STATE(urdc, RDC_SYNCING)) ||
2302                     (IS_STATE(urdc, RDC_LOGGING) &&
2303                     !IS_STATE(urdc, RDC_QUEUING))) {
2304                         goto localwrite;
2305                 }
2306 
2307                 p = rdc_aio_buf_add(krdc->index, h);
2308                 if (p == NULL) {
2309 #ifdef DEBUG
2310                         cmn_err(CE_WARN,
2311                             "!rdc_alloc_buf  aio_buf allocation failed");
2312 #endif
2313                         goto localwrite;
2314                 }
2315 
2316                 mutex_enter(&h->aio_lock);
2317 
2318                 DTRACE_PROBE(rdc_write_async__allocabuf_start);
2319                 rc1 = nsc_alloc_abuf(pos, len, 0, &p->rdc_abufp);
2320                 DTRACE_PROBE(rdc_write_async__allocabuf_end);
2321                 if (!RDC_SUCCESS(rc1)) {
2322 #ifdef DEBUG
2323                         cmn_err(CE_WARN,
2324                             "!rdc_alloc_buf NSC_ANON allocation failed rc %d",
2325                             rc1);
2326 #endif
2327                         mutex_exit(&h->aio_lock);
2328                         goto localwrite;
2329                 }
2330                 h->rdc_flags |= RDC_ASYNC_VEC;
2331                 mutex_exit(&h->aio_lock);
2332 
2333                 /*
2334                  * Copy buffer into anonymous buffer
2335                  */
2336 
2337                 DTRACE_PROBE(rdc_write_async_nsccopy_start);
2338                 rc1 =
2339                     nsc_copy(&h->rdc_bufh, p->rdc_abufp, pos, pos, len);
2340                 DTRACE_PROBE(rdc_write_async_nsccopy_end);
2341                 if (!RDC_SUCCESS(rc1)) {
2342 #ifdef DEBUG
2343                         cmn_err(CE_WARN,
2344                             "!_rdc_write: nsc_copy failed rc=%d state %x",
2345                             rc1, rdc_get_vflags(urdc));
2346 #endif
2347                         rc1 = nsc_free_buf(p->rdc_abufp);
2348                         rdc_aio_buf_del(h, krdc);
2349                         rdc_group_enter(krdc);
2350                         rdc_group_log(krdc, RDC_FLUSH|RDC_OTHERREMOTE,
2351                             "nsc_copy failure");
2352                         rdc_group_exit(krdc);
2353                 }
2354                 DTRACE_PROBE(rdc_write_async_end);
2355 
2356                 /*
2357                  * using a diskq, launch a thread to queue it
2358                  * and free the aio->h and aio
2359                  * if the thread fails, do it the old way (see localwrite)
2360                  */
2361 
2362                 if (RDC_IS_DISKQ(krdc->group)) {
2363 
2364                         if (nthr >= SNDR_MAXTHREADS) {
2365 #ifdef DEBUG
2366                                 cmn_err(CE_NOTE, "!nthr overrun in _rdc_write");
2367 #endif
2368                                 thrrc = ENOEXEC;
2369                                 goto localwrite;
2370                         }
2371 
2372                         anon = rdc_aio_buf_get(h, krdc->index);
2373                         if (anon == NULL) {
2374 #ifdef DEBUG
2375                                 cmn_err(CE_WARN, "!rdc_aio_buf_get failed for "
2376                                     "%p", (void *)h);
2377 #endif
2378                                 thrrc = ENOEXEC;
2379                                 goto localwrite;
2380                         }
2381 
2382                         /* get a populated rdc_aio_t */
2383                         bp[nthr] =
2384                             rdc_aio_tbuf_get(sync, anon->rdc_abufp, pos, len,
2385                             flag, krdc->index, bitmask);
2386 
2387                         if (bp[nthr] == NULL) {
2388 #ifdef DEBUG
2389                                 cmn_err(CE_NOTE, "!_rdcwrite: "
2390                                     "kmem_alloc failed bp aio (1)");
2391 #endif
2392                                 thrrc = ENOEXEC;
2393                                 goto localwrite;
2394                         }
2395                         /* start the queue io */
2396                         tp = nst_create(_rdc_ioset, _rdc_diskq_enqueue_thr,
2397                             (void *)bp[nthr], NST_SLEEP);
2398 
2399                         if (tp == NULL) {
2400 #ifdef DEBUG
2401                                 cmn_err(CE_NOTE,
2402                                     "!_rdcwrite: nst_create failure");
2403 #endif
2404                                 thrrc = ENOEXEC;
2405                         } else {
2406                                 mutex_enter(&(sync->lock));
2407                                 sync->threads++;
2408                                 mutex_exit(&(sync->lock));
2409                                 nthr++;
2410 
2411                         }
2412                         /*
2413                          * the handle that is to be enqueued is now in
2414                          * the rdc_aio_t, and will be freed there.
2415                          * dump the aio_t now. If this is 1 to many
2416                          * we may not do this in _rdc_free_buf()
2417                          * if this was not the index that the rdc_buf_t
2418                          * was allocated on.
2419                          */
2420                         rdc_aio_buf_del(h, krdc);
2421 
2422                 }
2423         }       /* end of async */
2424 
2425         /*
2426          * We try to overlap local and network IO for the sync case
2427          * (we already do it for async)
2428          * If one to many, we need to track the resulting nst_thread
2429          * so we don't trash the nsc_buf on a free
2430          * Start network IO first then do local (sync only)
2431          */
2432 
2433         if (IS_PRIMARY(urdc) && !IS_STATE(urdc, RDC_LOGGING) &&
2434             !BUF_IS_ASYNC(h)) {
2435                 /*
2436                  * if forward syncing, we must do local IO first
2437                  * then remote io. Don't spawn thread
2438                  */
2439                 if (!rsync && (IS_STATE(urdc, RDC_SYNCING))) {
2440                         thrrc = ENOEXEC;
2441                         goto localwrite;
2442                 }
2443                 if (IS_MULTI(krdc)) {
2444                         rdc_k_info_t *ktmp;
2445                         rdc_u_info_t *utmp;
2446 
2447                         ktmp = krdc->multi_next;
2448                         utmp = &rdc_u_info[ktmp->index];
2449                         if (IS_ENABLED(utmp))
2450                                 multi = ktmp;
2451                 }
2452                 if (nthr >= SNDR_MAXTHREADS) {
2453 #ifdef DEBUG
2454                         cmn_err(CE_NOTE, "!nthr overrun in _rdc_write");
2455 #endif
2456                         thrrc = ENOEXEC;
2457                         goto localwrite;
2458                 }
2459 
2460                 bp[nthr] = rdc_aio_tbuf_get(sync, h, pos, len,
2461                     flag, krdc->index, bitmask);
2462 
2463                 if (bp[nthr] == NULL) {
2464                         thrrc = ENOEXEC;
2465                         goto localwrite;
2466                 }
2467                 tp = nst_create(_rdc_ioset, _rdc_sync_write_thr,
2468                     (void *)bp[nthr], NST_SLEEP);
2469                 if (tp == NULL) {
2470 #ifdef DEBUG
2471                         cmn_err(CE_NOTE, "!_rdcwrite: nst_create failure");
2472 #endif
2473                         thrrc = ENOEXEC;
2474                 } else {
2475                         mutex_enter(&(sync->lock));
2476                         sync->threads++;
2477                         mutex_exit(&(sync->lock));
2478                         nthr++;
2479                 }
2480         }
2481 localwrite:
2482         if (!remote && !rsync && first) {
2483                 DTRACE_PROBE(rdc_write_nscwrite_start);
2484                 rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2485                 DTRACE_PROBE(rdc_write_nscwrite_end);
2486                 if (!RDC_SUCCESS(rc1)) {
2487                         rdc_many_enter(krdc);
2488                         if (IS_PRIMARY(urdc))
2489                                 /* Primary, so reverse sync needed */
2490                                 rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2491                         else
2492                                 /* Secondary, so sync needed */
2493                                 rdc_set_flags(urdc, RDC_SYNC_NEEDED);
2494                         rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2495                             "local write failed");
2496                         rdc_many_exit(krdc);
2497                         rdc_write_state(urdc);
2498                 }
2499         }
2500 
2501         /*
2502          * This is where we either enqueue async IO for the flusher
2503          * or do sync IO in the case of an error in thread creation
2504          * or we are doing a forward sync
2505          * NOTE: if we are async, and using a diskq, we have
2506          * already enqueued this write.
2507          * _rdc_remote_write will end up enqueuueing to memory,
2508          * or in case of a thread creation error above, try again
2509          * enqueue the diskq write if thrrc == ENOEXEC
2510          */
2511         if ((IS_PRIMARY(urdc)) && (thrrc == ENOEXEC) ||
2512             (BUF_IS_ASYNC(h) && !RDC_IS_DISKQ(krdc->group))) {
2513                 thrrc = 0;
2514                 if (IS_MULTI(krdc)) {
2515                         rdc_k_info_t *ktmp;
2516                         rdc_u_info_t *utmp;
2517 
2518                         ktmp = krdc->multi_next;
2519                         utmp = &rdc_u_info[ktmp->index];
2520                         if (IS_ENABLED(utmp))
2521                                 multi = ktmp;
2522                 }
2523 
2524                 DTRACE_PROBE(rdc_write_remote_start);
2525 
2526                 rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh,
2527                     pos, len, flag, bitmask);
2528 
2529                 DTRACE_PROBE(rdc_rdcwrite_remote_end);
2530         }
2531 
2532         if (!RDC_SUCCESS(rc1)) {
2533                 if ((IS_PRIMARY(urdc)) && !RDC_SUCCESS(rc2)) {
2534                         h->rdc_bufh.sb_error = rc1;
2535                 }
2536         } else if ((remote || rsync) && !RDC_SUCCESS(rc2)) {
2537                 h->rdc_bufh.sb_error = rc2;
2538         }
2539 write2:
2540         /*
2541          * If one to many, jump back into the loop to continue IO
2542          */
2543         if (IS_MANY(krdc) && (IS_PRIMARY(urdc))) {
2544                 rdc_many_enter(krdc);
2545                 for (krdc = krdc->many_next; krdc != this;
2546                     krdc = krdc->many_next) {
2547                         urdc = &rdc_u_info[krdc->index];
2548                         if (!IS_ENABLED(urdc))
2549                                 continue;
2550                         rc2 = first = 0;
2551                         h->rdc_flags &= ~RDC_ASYNC_BUF;
2552                         rdc_many_exit(krdc);
2553                         goto write1;
2554                 }
2555                 rdc_many_exit(krdc);
2556         }
2557         urdc = &rdc_u_info[krdc->index];
2558 
2559         /*
2560          * collect all of our threads if any
2561          */
2562         if (nthr) {
2563 
2564                 mutex_enter(&(sync->lock));
2565                 /* wait for the threads */
2566                 while (sync->complete != sync->threads) {
2567                         cv_wait(&(sync->cv), &(sync->lock));
2568                 }
2569                 mutex_exit(&(sync->lock));
2570 
2571                 /* collect status */
2572 
2573                 winddown = 0;
2574                 while (winddown < nthr) {
2575                         /*
2576                          * Get any error return from thread
2577                          */
2578                         if ((remote || rsync) && bp[winddown]->flag) {
2579                                 h->rdc_bufh.sb_error = bp[winddown]->flag;
2580                         }
2581                         if (bp[winddown])
2582                                 kmem_free(bp[winddown], sizeof (rdc_aio_t));
2583                         winddown++;
2584                 }
2585         }
2586 
2587         if (rsync && !(IS_STATE(urdc, RDC_VOL_FAILED))) {
2588                 rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2589                 if (!RDC_SUCCESS(rc1)) {
2590                         /* rsync, so reverse sync needed already set */
2591                         rdc_many_enter(krdc);
2592                         rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2593                             "rsync local write failed");
2594                         rdc_many_exit(krdc);
2595                         rdc_write_state(urdc);
2596 
2597                         /*
2598                          * only report the error if a remote error
2599                          * occurred as well.
2600                          */
2601                         if (h->rdc_bufh.sb_error)
2602                                 h->rdc_bufh.sb_error = rc1;
2603                 }
2604         }
2605 
2606         if (multi) {
2607                 /* Multi-hop secondary, just set bits in the bitmap */
2608                 (void) RDC_SET_BITMAP(multi, pos, len, &bitmask);
2609         }
2610 
2611         return (h->rdc_bufh.sb_error);
2612 }
2613 
2614 
2615 static void
2616 _rdc_bzero(nsc_buf_t *h, nsc_off_t pos, nsc_size_t len)
2617 {
2618         nsc_vec_t *v;
2619         uchar_t *a;
2620         size_t sz;
2621         int l;
2622 
2623         if (!RDC_HANDLE_LIMITS(h, pos, len)) {
2624                 cmn_err(CE_WARN,
2625                     "!_rdc_bzero: bounds check: io(handle) pos %" NSC_XSZFMT
2626                     "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2627                     pos, h->sb_pos, len, h->sb_len);
2628                 return;
2629         }
2630 
2631         if (!len)
2632                 return;
2633 
2634         /* find starting point */
2635 
2636         v = h->sb_vec;
2637         pos -= h->sb_pos;
2638 
2639         for (; pos >= FBA_NUM(v->sv_len); v++)
2640                 pos -= FBA_NUM(v->sv_len);
2641 
2642         a = v->sv_addr + FBA_SIZE(pos);
2643         l = v->sv_len - FBA_SIZE(pos);
2644 
2645         /* zero */
2646 
2647         len = FBA_SIZE(len);    /* convert to bytes */
2648 
2649         while (len) {
2650                 if (!a)         /* end of vec */
2651                         break;
2652 
2653                 sz = (size_t)min((nsc_size_t)l, len);
2654 
2655                 bzero(a, sz);
2656 
2657                 len -= sz;
2658                 l -= sz;
2659                 a += sz;
2660 
2661                 if (!l) {
2662                         v++;
2663                         a = v->sv_addr;
2664                         l = v->sv_len;
2665                 }
2666         }
2667 }
2668 
2669 
2670 /*
2671  * _rdc_zero
2672  *
2673  * Zero and commit the specified area of the buffer.
2674  *
2675  * If this write is whilst the local primary volume is being synced,
2676  * then we write the remote end first to ensure that the new data
2677  * cannot be overwritten by a concurrent sync operation.
2678  */
2679 
2680 static int
2681 _rdc_zero(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2682 {
2683         rdc_k_info_t *krdc = h->rdc_fd->rdc_info;
2684         rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2685         rdc_k_info_t *this;
2686         rdc_k_info_t *multi = NULL;
2687         int remote = RDC_REMOTE(h);
2688         int rc1, rc2;
2689         uint_t bitmask;
2690         int first;
2691         int rsync;
2692 
2693         /* If this is the multi-hop secondary, move along to the primary */
2694         if (IS_MULTI(krdc) && !(rdc_get_vflags(urdc) & RDC_PRIMARY)) {
2695                 multi = krdc;
2696                 krdc = krdc->multi_next;
2697                 urdc = &rdc_u_info[krdc->index];
2698 
2699                 if (!IS_ENABLED(urdc)) {
2700                         krdc = h->rdc_fd->rdc_info;
2701                         urdc = &rdc_u_info[krdc->index];
2702                         multi = NULL;
2703                 }
2704         }
2705         this = krdc;
2706 
2707         rsync = ((rdc_get_vflags(urdc) & RDC_PRIMARY) &&
2708             (rdc_get_mflags(urdc) & RDC_SLAVE));
2709 
2710         /*
2711          * If this is a many group with a reverse sync in progress and
2712          * this is not the slave krdc/urdc, then search for the slave
2713          * so that we can do the remote io to the correct secondary
2714          * before the local io.
2715          */
2716         if (rsync && !(rdc_get_vflags(urdc) & RDC_SLAVE)) {
2717                 rdc_many_enter(krdc);
2718                 for (krdc = krdc->many_next; krdc != this;
2719                     krdc = krdc->many_next) {
2720                         urdc = &rdc_u_info[krdc->index];
2721                         if (!IS_ENABLED(urdc))
2722                                 continue;
2723                         if (rdc_get_vflags(urdc) & RDC_SLAVE)
2724                                 break;
2725                 }
2726                 rdc_many_exit(krdc);
2727 
2728                 this = krdc;
2729         }
2730 
2731         rc1 = rc2 = 0;
2732         first = 1;
2733 
2734         if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2735                 cmn_err(CE_WARN,
2736                     "!_rdc_zero: bounds check: io(handle) pos %" NSC_XSZFMT
2737                     "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2738                     pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2739                 h->rdc_bufh.sb_error = EINVAL;
2740                 return (h->rdc_bufh.sb_error);
2741         }
2742 
2743 zero1:
2744         if (RDC_SET_BITMAP(krdc, pos, len, &bitmask) < 0) {
2745                 (void) nsc_uncommit(h->rdc_bufp, pos, len, flag);
2746                 h->rdc_bufh.sb_error = EIO;
2747                 goto zero2;
2748         }
2749 
2750         if (IS_ASYNC(urdc)) {
2751                 /*
2752                  * We are async mode
2753                  */
2754                 aio_buf_t *p;
2755 
2756                 if ((krdc->type_flag & RDC_DISABLEPEND) ||
2757                     (rdc_get_vflags(urdc) & RDC_LOGGING)) {
2758                         mutex_exit(&krdc->group->ra_queue.net_qlock);
2759                         goto localzero;
2760                 }
2761 
2762                 if ((rdc_get_vflags(urdc) & RDC_VOL_FAILED) ||
2763                     (rdc_get_vflags(urdc) & RDC_BMP_FAILED)) {
2764                         mutex_exit(&krdc->group->ra_queue.net_qlock);
2765                         goto zero2;
2766                 }
2767                 if (rdc_get_vflags(urdc) & RDC_LOGGING) {
2768                         mutex_exit(&krdc->group->ra_queue.net_qlock);
2769                         goto localzero;
2770                 }
2771                 p = rdc_aio_buf_add(krdc->index, h);
2772                 if (p == NULL) {
2773 #ifdef DEBUG
2774                         cmn_err(CE_WARN,
2775                             "!rdc_alloc_buf  aio_buf allocation failed");
2776 #endif
2777                         goto localzero;
2778                 }
2779                 mutex_enter(&h->aio_lock);
2780                 rc1 = nsc_alloc_abuf(pos, len, 0, &p->rdc_abufp);
2781                 if (!RDC_SUCCESS(rc1)) {
2782 #ifdef DEBUG
2783                         cmn_err(CE_WARN,
2784                             "!rdc_alloc_buf NSC_ANON allocation failed rc %d",
2785                             rc1);
2786 #endif
2787                         mutex_exit(&h->aio_lock);
2788                         goto localzero;
2789                 }
2790                 h->rdc_flags |= RDC_ASYNC_VEC;
2791                 mutex_exit(&h->aio_lock);
2792 
2793                 /*
2794                  * Copy buffer into anonymous buffer
2795                  */
2796 
2797                 rc1 = nsc_zero(p->rdc_abufp, pos, len, flag);
2798                 if (!RDC_SUCCESS(rc1)) {
2799 #ifdef DEBUG
2800                         cmn_err(CE_WARN,
2801                             "!_rdc_zero: nsc_zero failed rc=%d state %x",
2802                             rc1, rdc_get_vflags(urdc));
2803 #endif
2804                         rc1 = nsc_free_buf(p->rdc_abufp);
2805                         rdc_aio_buf_del(h, krdc);
2806                         rdc_group_enter(krdc);
2807                         rdc_group_log(krdc, RDC_FLUSH | RDC_OTHERREMOTE,
2808                             "nsc_zero failed");
2809                         rdc_group_exit(krdc);
2810                 }
2811         }       /* end of async */
2812 
2813 localzero:
2814 
2815         if (flag & NSC_NOBLOCK) {
2816                 cmn_err(CE_WARN,
2817                     "!_rdc_zero: removing unsupported NSC_NOBLOCK flag");
2818                 flag &= ~(NSC_NOBLOCK);
2819         }
2820 
2821         if (!remote && !rsync && first) {
2822                 rc1 = nsc_zero(h->rdc_bufp, pos, len, flag);
2823                 if (!RDC_SUCCESS(rc1)) {
2824                         ASSERT(rdc_get_vflags(urdc) & RDC_PRIMARY);
2825                         rdc_many_enter(krdc);
2826                         /* Primary, so reverse sync needed */
2827                         rdc_set_mflags(urdc, RDC_RSYNC_NEEDED);
2828                         rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2829                             "nsc_zero failed");
2830                         rdc_many_exit(krdc);
2831                         rdc_write_state(urdc);
2832                 }
2833         }
2834 
2835         /*
2836          * send new data to remote end - nsc_zero has zero'd
2837          * the data in the buffer, or _rdc_bzero will be used below.
2838          */
2839 
2840         if (rdc_get_vflags(urdc) & RDC_PRIMARY) {
2841                 if (first && (remote || rsync || !RDC_SUCCESS(rc1))) {
2842                         /* bzero so that we can send new data to remote node */
2843                         _rdc_bzero(&h->rdc_bufh, pos, len);
2844                 }
2845 
2846                 if (IS_MULTI(krdc)) {
2847                         rdc_k_info_t *ktmp;
2848                         rdc_u_info_t *utmp;
2849 
2850                         ktmp = krdc->multi_next;
2851                         utmp = &rdc_u_info[ktmp->index];
2852                         if (IS_ENABLED(utmp))
2853                                 multi = ktmp;
2854                 }
2855 
2856                 rc2 = _rdc_remote_write(krdc, h, &h->rdc_bufh,
2857                     pos, len, flag, bitmask);
2858         }
2859 
2860         if (!RDC_SUCCESS(rc1)) {
2861                 if ((rdc_get_vflags(urdc) & RDC_PRIMARY) && !RDC_SUCCESS(rc2)) {
2862                         h->rdc_bufh.sb_error = rc1;
2863                 }
2864         } else if ((remote || rsync) && !RDC_SUCCESS(rc2)) {
2865                 h->rdc_bufh.sb_error = rc2;
2866         }
2867 
2868 zero2:
2869         if (IS_MANY(krdc) && (rdc_get_vflags(urdc) && RDC_PRIMARY)) {
2870                 rdc_many_enter(krdc);
2871                 for (krdc = krdc->many_next; krdc != this;
2872                     krdc = krdc->many_next) {
2873                         urdc = &rdc_u_info[krdc->index];
2874                         if (!IS_ENABLED(urdc))
2875                                 continue;
2876                         rc2 = first = 0;
2877                         rdc_many_exit(krdc);
2878                         goto zero1;
2879                 }
2880                 rdc_many_exit(krdc);
2881         }
2882 
2883         if (rsync && !(rdc_get_vflags(urdc) & RDC_VOL_FAILED)) {
2884                 rc1 = nsc_write(h->rdc_bufp, pos, len, flag);
2885                 if (!RDC_SUCCESS(rc1)) {
2886                         /* rsync, so reverse sync needed already set */
2887                         rdc_many_enter(krdc);
2888                         rdc_set_flags_log(urdc, RDC_VOL_FAILED,
2889                             "nsc_write failed");
2890                         rdc_many_exit(krdc);
2891                         rdc_write_state(urdc);
2892 
2893                         /*
2894                          * only report the error if a remote error
2895                          * occurred as well.
2896                          */
2897                         if (h->rdc_bufh.sb_error)
2898                                 h->rdc_bufh.sb_error = rc1;
2899                 }
2900         }
2901 
2902         if (multi) {
2903                 /* Multi-hop secondary, just set bits in the bitmap */
2904                 (void) RDC_SET_BITMAP(multi, pos, len, &bitmask);
2905         }
2906 
2907         return (h->rdc_bufh.sb_error);
2908 }
2909 
2910 
2911 /*
2912  * _rdc_uncommit
2913  * - refresh specified data region in the buffer to prevent the cache
2914  *   serving the scribbled on data back to another client.
2915  *
2916  * Only needs to happen on the local node.  If in remote io mode, then
2917  * just return 0 - we do not cache the data on the local node and the
2918  * changed data will not have made it to the cache on the other node,
2919  * so it has no need to uncommit.
2920  */
2921 
2922 static int
2923 _rdc_uncommit(rdc_buf_t *h, nsc_off_t pos, nsc_size_t len, int flag)
2924 {
2925         int remote = RDC_REMOTE(h);
2926         int rc = 0;
2927 
2928         if (!RDC_HANDLE_LIMITS(&h->rdc_bufh, pos, len)) {
2929                 cmn_err(CE_WARN,
2930                     "!_rdc_uncommit: bounds check: io(handle) pos %" NSC_XSZFMT
2931                     "(%" NSC_XSZFMT ") len %" NSC_XSZFMT "(%" NSC_XSZFMT ")",
2932                     pos, h->rdc_bufh.sb_pos, len, h->rdc_bufh.sb_len);
2933                 h->rdc_bufh.sb_error = EINVAL;
2934                 return (h->rdc_bufh.sb_error);
2935         }
2936 
2937         if (flag & NSC_NOBLOCK) {
2938                 cmn_err(CE_WARN,
2939                     "!_rdc_uncommit: removing unsupported NSC_NOBLOCK flag");
2940                 flag &= ~(NSC_NOBLOCK);
2941         }
2942 
2943         if (!remote) {
2944                 rc = nsc_uncommit(h->rdc_bufp, pos, len, flag);
2945         }
2946 
2947         if (!RDC_SUCCESS(rc))
2948                 h->rdc_bufh.sb_error = rc;
2949 
2950         return (rc);
2951 }
2952 
2953 
2954 /*
2955  * _rdc_trksize
2956  *
2957  * only needs to happen on local node.
2958  */
2959 
2960 static int
2961 _rdc_trksize(rdc_fd_t *rfd, nsc_size_t trksize)
2962 {
2963         return (nsc_set_trksize(RDC_FD(rfd), trksize));
2964 }
2965 
2966 
2967 static nsc_def_t _rdc_fd_def[] = {
2968         { "Attach",     (uintptr_t)_rdc_attach_fd,      0 },
2969         { "Pinned",     (uintptr_t)_rdc_pinned,         0 },
2970         { "Unpinned",   (uintptr_t)_rdc_unpinned,       0 },
2971         { NULL,         (uintptr_t)NULL,                0 }
2972 };
2973 
2974 
2975 static nsc_def_t _rdc_io_def[] = {
2976         { "Open",       (uintptr_t)_rdc_openc,          0 },
2977         { "Close",      (uintptr_t)_rdc_close,          0 },
2978         { "Attach",     (uintptr_t)_rdc_attach,         0 },
2979         { "Detach",     (uintptr_t)_rdc_detach,         0 },
2980         { "AllocHandle", (uintptr_t)_rdc_alloc_handle,  0 },
2981         { "FreeHandle", (uintptr_t)_rdc_free_handle,    0 },
2982         { "AllocBuf",   (uintptr_t)_rdc_alloc_buf,      0 },
2983         { "FreeBuf",    (uintptr_t)_rdc_free_buf,       0 },
2984         { "GetPinned",  (uintptr_t)_rdc_get_pinned,     0 },
2985         { "Discard",    (uintptr_t)_rdc_discard_pinned, 0 },
2986         { "PartSize",   (uintptr_t)_rdc_partsize,       0 },
2987         { "MaxFbas",    (uintptr_t)_rdc_maxfbas,        0 },
2988         { "Control",    (uintptr_t)_rdc_control,        0 },
2989         { "Read",       (uintptr_t)_rdc_read,           0 },
2990         { "Write",      (uintptr_t)_rdc_write,          0 },
2991         { "Zero",       (uintptr_t)_rdc_zero,           0 },
2992         { "Uncommit",   (uintptr_t)_rdc_uncommit,       0 },
2993         { "TrackSize",  (uintptr_t)_rdc_trksize,        0 },
2994         { "Provide",    (uintptr_t)NULL,                0 },
2995         { NULL,         (uintptr_t)NULL,                0 }
2996 };
2997 
2998 static nsc_def_t _rdc_ior_def[] = {
2999         { "Open",       (uintptr_t)_rdc_openr,          0 },
3000         { "Close",      (uintptr_t)_rdc_close,          0 },
3001         { "Attach",     (uintptr_t)_rdc_attach,         0 },
3002         { "Detach",     (uintptr_t)_rdc_detach,         0 },
3003         { "AllocHandle", (uintptr_t)_rdc_alloc_handle,  0 },
3004         { "FreeHandle", (uintptr_t)_rdc_free_handle,    0 },
3005         { "AllocBuf",   (uintptr_t)_rdc_alloc_buf,      0 },
3006         { "FreeBuf",    (uintptr_t)_rdc_free_buf,       0 },
3007         { "GetPinned",  (uintptr_t)_rdc_get_pinned,     0 },
3008         { "Discard",    (uintptr_t)_rdc_discard_pinned, 0 },
3009         { "PartSize",   (uintptr_t)_rdc_partsize,       0 },
3010         { "MaxFbas",    (uintptr_t)_rdc_maxfbas,        0 },
3011         { "Control",    (uintptr_t)_rdc_control,        0 },
3012         { "Read",       (uintptr_t)_rdc_read,           0 },
3013         { "Write",      (uintptr_t)_rdc_write,          0 },
3014         { "Zero",       (uintptr_t)_rdc_zero,           0 },
3015         { "Uncommit",   (uintptr_t)_rdc_uncommit,       0 },
3016         { "TrackSize",  (uintptr_t)_rdc_trksize,        0 },
3017         { "Provide",    (uintptr_t)NULL,                0 },
3018         { NULL,         (uintptr_t)NULL,                0 }
3019 };