1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2009 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #include <sys/types.h>
27 #include <sys/ksynch.h>
28 #include <sys/cmn_err.h>
29 #include <sys/kmem.h>
30 #include <sys/stat.h>
31 #include <sys/errno.h>
32
33 #include "../solaris/nsc_thread.h"
34 #ifdef DS_DDICT
35 #include "../contract.h"
36 #endif
37 #include <sys/nsctl/nsctl.h>
38
39 #include <sys/kmem.h>
40 #include <sys/ddi.h>
41
42 #include <sys/sdt.h> /* dtrace is S10 or later */
43
44 #include "rdc_io.h"
45 #include "rdc_bitmap.h"
46 #include "rdc_diskq.h"
47 #include "rdc_clnt.h"
48
49 #include <sys/unistat/spcs_s.h>
50 #include <sys/unistat/spcs_s_k.h>
51 #include <sys/unistat/spcs_errors.h>
52
53 extern nsc_io_t *_rdc_io_hc;
54
55 int rdc_diskq_coalesce = 0;
56
57 int
58 _rdc_rsrv_diskq(rdc_group_t *group)
59 {
60 int rc = 0;
61
62 mutex_enter(&group->diskqmutex);
63 if (group->diskqfd == NULL) {
64 mutex_exit(&group->diskqmutex);
65 return (EIO);
66 } else if ((group->diskqrsrv == 0) &&
67 (rc = nsc_reserve(group->diskqfd, 0)) != 0) {
68 cmn_err(CE_WARN,
69 "!rdc: nsc_reserve(%s) failed %d\n",
70 nsc_pathname(group->diskqfd), rc);
71 } else {
72 group->diskqrsrv++;
73 }
74
75 mutex_exit(&group->diskqmutex);
76 return (rc);
77 }
78
79 void
80 _rdc_rlse_diskq(rdc_group_t *group)
81 {
82 mutex_enter(&group->diskqmutex);
83 if (group->diskqrsrv > 0 && --group->diskqrsrv == 0) {
84 nsc_release(group->diskqfd);
85 }
86 mutex_exit(&group->diskqmutex);
87 }
88
89 void
90 rdc_wait_qbusy(disk_queue *q)
91 {
92 ASSERT(MUTEX_HELD(QLOCK(q)));
93 while (q->busycnt > 0)
94 cv_wait(&q->busycv, QLOCK(q));
95 }
96
97 void
98 rdc_set_qbusy(disk_queue *q)
99 {
100 ASSERT(MUTEX_HELD(QLOCK(q)));
101 q->busycnt++;
102 }
103
104 void
105 rdc_clr_qbusy(disk_queue *q)
106 {
107 ASSERT(MUTEX_HELD(QLOCK(q)));
108 q->busycnt--;
109 if (q->busycnt == 0)
110 cv_broadcast(&q->busycv);
111 }
112
113 int
114 rdc_lookup_diskq(char *pathname)
115 {
116 rdc_u_info_t *urdc;
117 #ifdef DEBUG
118 rdc_k_info_t *krdc;
119 #endif
120 int index;
121
122 for (index = 0; index < rdc_max_sets; index++) {
123 urdc = &rdc_u_info[index];
124 #ifdef DEBUG
125 krdc = &rdc_k_info[index];
126 #endif
127 ASSERT(krdc->index == index);
128 ASSERT(urdc->index == index);
129 if (!IS_ENABLED(urdc))
130 continue;
131
132 if (strncmp(pathname, urdc->disk_queue,
133 NSC_MAXPATH) == 0)
134 return (index);
135 }
136
137 return (-1);
138 }
139
140 void
141 rdc_unintercept_diskq(rdc_group_t *grp)
142 {
143 if (!RDC_IS_DISKQ(grp))
144 return;
145 if (grp->q_tok)
146 (void) nsc_unregister_path(grp->q_tok, 0);
147 grp->q_tok = NULL;
148 }
149
150 void
151 rdc_close_diskq(rdc_group_t *grp)
152 {
153
154 if (grp == NULL) {
155 #ifdef DEBUG
156 cmn_err(CE_WARN, "!rdc_close_diskq: NULL group!");
157 #endif
158 return;
159 }
160
161 if (grp->diskqfd) {
162 if (nsc_close(grp->diskqfd) != 0) {
163 #ifdef DEBUG
164 cmn_err(CE_WARN, "!nsc_close on diskq failed");
165 #else
166 ;
167 /*EMPTY*/
168 #endif
169 }
170 grp->diskqfd = 0;
171 grp->diskqrsrv = 0;
172 }
173 bzero(&grp->diskq.disk_hdr, sizeof (diskq_header));
174 }
175
176 /*
177 * nsc_open the diskq and attach
178 * the nsc_fd to krdc->diskqfd
179 */
180 int
181 rdc_open_diskq(rdc_k_info_t *krdc)
182 {
183 rdc_u_info_t *urdc;
184 rdc_group_t *grp;
185 int sts;
186 nsc_size_t size;
187 char *diskqname;
188 int mutexheld = 0;
189
190 grp = krdc->group;
191 urdc = &rdc_u_info[krdc->index];
192
193 mutex_enter(&grp->diskqmutex);
194 mutexheld++;
195 if (urdc->disk_queue[0] == '\0') {
196 goto fail;
197 }
198
199 diskqname = &urdc->disk_queue[0];
200
201 if (grp->diskqfd == NULL) {
202 grp->diskqfd = nsc_open(diskqname,
203 NSC_RDCHR_ID|NSC_DEVICE|NSC_WRITE, 0, 0, 0);
204 if (grp->diskqfd == NULL) {
205 cmn_err(CE_WARN, "!rdc_open_diskq: Unable to open %s",
206 diskqname);
207 goto fail;
208 }
209 }
210 if (!grp->q_tok)
211 grp->q_tok = nsc_register_path(urdc->disk_queue,
212 NSC_DEVICE | NSC_CACHE, _rdc_io_hc);
213
214 grp->diskqrsrv = 0; /* init reserve count */
215
216 mutex_exit(&grp->diskqmutex);
217 mutexheld--;
218 /* just test a reserve release */
219 sts = _rdc_rsrv_diskq(grp);
220 if (!RDC_SUCCESS(sts)) {
221 cmn_err(CE_WARN, "!rdc_open_diskq: Reserve failed for %s",
222 diskqname);
223 goto fail;
224 }
225 sts = nsc_partsize(grp->diskqfd, &size);
226 _rdc_rlse_diskq(grp);
227
228 if ((sts == 0) && (size < 1)) {
229 rdc_unintercept_diskq(grp);
230 rdc_close_diskq(grp);
231 goto fail;
232 }
233
234 return (0);
235
236 fail:
237 bzero(&urdc->disk_queue, NSC_MAXPATH);
238 if (mutexheld)
239 mutex_exit(&grp->diskqmutex);
240 return (-1);
241
242 }
243
244 /*
245 * rdc_count_vecs
246 * simply vec++'s until sb_addr is null
247 * returns number of vectors encountered
248 */
249 int
250 rdc_count_vecs(nsc_vec_t *vec)
251 {
252 nsc_vec_t *vecp;
253 int i = 0;
254 vecp = vec;
255 while (vecp->sv_addr) {
256 vecp++;
257 i++;
258 }
259 return (i+1);
260 }
261 /*
262 * rdc_setid2idx
263 * given setid, return index
264 */
265 int
266 rdc_setid2idx(int setid)
267 {
268
269 int index = 0;
270
271 for (index = 0; index < rdc_max_sets; index++) {
272 if (rdc_u_info[index].setid == setid)
273 break;
274 }
275 if (index >= rdc_max_sets)
276 index = -1;
277 return (index);
278 }
279
280 /*
281 * rdc_idx2setid
282 * given an index, return its setid
283 */
284 int
285 rdc_idx2setid(int index)
286 {
287 return (rdc_u_info[index].setid);
288 }
289
290 /*
291 * rdc_fill_ioheader
292 * fill in all the stuff you want to save on disk
293 * at the beginnig of each queued write
294 */
295 void
296 rdc_fill_ioheader(rdc_aio_t *aio, io_hdr *hd, int qpos)
297 {
298 ASSERT(MUTEX_HELD(&rdc_k_info[aio->index].group->diskq.disk_qlock));
299
300 hd->dat.magic = RDC_IOHDR_MAGIC;
301 hd->dat.type = RDC_QUEUEIO;
302 hd->dat.pos = aio->pos;
303 hd->dat.hpos = aio->pos;
304 hd->dat.qpos = qpos;
305 hd->dat.len = aio->len;
306 hd->dat.flag = aio->flag;
307 hd->dat.iostatus = aio->iostatus;
308 hd->dat.setid = rdc_idx2setid(aio->index);
309 hd->dat.time = nsc_time();
310 if (!aio->handle)
311 hd->dat.flag |= RDC_NULL_BUF; /* no real data to queue */
312 }
313
314 /*
315 * rdc_dump_iohdrs
316 * give back the iohdr list
317 * and clear out q->lastio
318 */
319 void
320 rdc_dump_iohdrs(disk_queue *q)
321 {
322 io_hdr *p, *r;
323
324 ASSERT(MUTEX_HELD(QLOCK(q)));
325
326 p = q->iohdrs;
327 while (p) {
328 r = p->dat.next;
329 kmem_free(p, sizeof (*p));
330 q->hdrcnt--;
331 p = r;
332 }
333 q->iohdrs = q->hdr_last = NULL;
334 q->hdrcnt = 0;
335 if (q->lastio->handle)
336 (void) nsc_free_buf(q->lastio->handle);
337 bzero(&(*q->lastio), sizeof (*q->lastio));
338 }
339
340 /*
341 * rdc_fail_diskq
342 * set flags, throw away q info
343 * clean up what you can
344 * wait for flusher threads to stop (taking into account this may be one)
345 * takes group_lock, so conf, many, and bitmap may not be held
346 */
347 void
348 rdc_fail_diskq(rdc_k_info_t *krdc, int wait, int flag)
349 {
350 rdc_k_info_t *p;
351 rdc_u_info_t *q = &rdc_u_info[krdc->index];
352 rdc_group_t *group = krdc->group;
353 disk_queue *dq = &krdc->group->diskq;
354
355 if (IS_STATE(q, RDC_DISKQ_FAILED))
356 return;
357
358 if (!(flag & RDC_NOFAIL))
359 cmn_err(CE_WARN, "!disk queue %s failure", q->disk_queue);
360
361 if (flag & RDC_DOLOG) {
362 rdc_group_enter(krdc);
363 rdc_group_log(krdc, RDC_NOFLUSH | RDC_ALLREMOTE,
364 "disk queue failed");
365 rdc_group_exit(krdc);
366 }
367 mutex_enter(QHEADLOCK(dq));
368 mutex_enter(QLOCK(dq));
369 /*
370 * quick stop of the flushers
371 * other cleanup is done on the un-failing of the diskq
372 */
373 SET_QHEAD(dq, RDC_DISKQ_DATA_OFF);
374 SET_QTAIL(dq, RDC_DISKQ_DATA_OFF);
375 SET_QNXTIO(dq, RDC_DISKQ_DATA_OFF);
376 SET_LASTQTAIL(dq, 0);
377
378 rdc_dump_iohdrs(dq);
379
380 mutex_exit(QLOCK(dq));
381 mutex_exit(QHEADLOCK(dq));
382
383 bzero(krdc->bitmap_ref, krdc->bitmap_size * BITS_IN_BYTE *
384 BMAP_REF_PREF_SIZE);
385
386 if (flag & RDC_DOLOG) /* otherwise, we already have the conf lock */
387 rdc_group_enter(krdc);
388
389 else if (!(flag & RDC_GROUP_LOCKED))
390 ASSERT(MUTEX_HELD(&rdc_conf_lock));
391
392 if (!(flag & RDC_NOFAIL)) {
393 rdc_set_flags(q, RDC_DISKQ_FAILED);
394 }
395 rdc_clr_flags(q, RDC_QUEUING);
396
397 for (p = krdc->group_next; p != krdc; p = p->group_next) {
398 q = &rdc_u_info[p->index];
399 if (!IS_ENABLED(q))
400 continue;
401 if (!(flag & RDC_NOFAIL)) {
402 rdc_set_flags(q, RDC_DISKQ_FAILED);
403 }
404 rdc_clr_flags(q, RDC_QUEUING);
405 bzero(p->bitmap_ref, p->bitmap_size * BITS_IN_BYTE *
406 BMAP_REF_PREF_SIZE);
407 /* RDC_QUEUING is cleared in group_log() */
408 }
409
410 if (flag & RDC_DOLOG)
411 rdc_group_exit(krdc);
412
413 /* can't wait for myself to go away, I'm a flusher */
414 if (wait & RDC_WAIT)
415 while (group->rdc_thrnum)
416 delay(2);
417
418 }
419
420 /*
421 * rdc_stamp_diskq
422 * write out diskq header info
423 * must have disk_qlock held
424 * if rsrvd flag is 0, the nsc_reserve is done
425 */
426 int
427 rdc_stamp_diskq(rdc_k_info_t *krdc, int rsrvd, int failflags)
428 {
429 nsc_vec_t vec[2];
430 nsc_buf_t *head = NULL;
431 rdc_group_t *grp;
432 rdc_u_info_t *urdc;
433 disk_queue *q;
434 int rc, flags;
435
436 grp = krdc->group;
437 q = &krdc->group->diskq;
438
439 ASSERT(MUTEX_HELD(&q->disk_qlock));
440
441 urdc = &rdc_u_info[krdc->index];
442
443 if (!rsrvd && _rdc_rsrv_diskq(grp)) {
444 cmn_err(CE_WARN, "!rdc_stamp_diskq: %s reserve failed",
445 urdc->disk_queue);
446 mutex_exit(QLOCK(q));
447 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
448 mutex_enter(QLOCK(q));
449 return (-1);
450 }
451 flags = NSC_WRITE | NSC_NOCACHE | NSC_NODATA;
452 rc = nsc_alloc_buf(grp->diskqfd, 0, 1, flags, &head);
453
454 if (!RDC_SUCCESS(rc)) {
455 cmn_err(CE_WARN, "!Alloc buf failed for disk queue %s",
456 &urdc->disk_queue[0]);
457 mutex_exit(QLOCK(q));
458 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
459 mutex_enter(QLOCK(q));
460 return (-1);
461 }
462 vec[0].sv_len = FBA_SIZE(1);
463 vec[0].sv_addr = (uchar_t *)&q->disk_hdr;
464 vec[1].sv_len = 0;
465 vec[1].sv_addr = NULL;
466
467 head->sb_vec = &vec[0];
468
469 #ifdef DEBUG_DISKQ
470 cmn_err(CE_NOTE, "!rdc_stamp_diskq: hdr: %p magic: %x state: "
471 "%x head: %d tail: %d size: %d nitems: %d blocks: %d",
472 q, QMAGIC(q), QSTATE(q), QHEAD(q),
473 QTAIL(q), QSIZE(q), QNITEMS(q), QBLOCKS(q));
474 #endif
475
476 rc = nsc_write(head, 0, 1, 0);
477
478 if (!RDC_SUCCESS(rc)) {
479 if (!rsrvd)
480 _rdc_rlse_diskq(grp);
481 cmn_err(CE_CONT, "!disk queue %s failed rc %d",
482 &urdc->disk_queue[0], rc);
483 mutex_exit(QLOCK(q));
484 rdc_fail_diskq(krdc, RDC_NOWAIT, failflags);
485 mutex_enter(QLOCK(q));
486 return (-1);
487 }
488
489 (void) nsc_free_buf(head);
490 if (!rsrvd)
491 _rdc_rlse_diskq(grp);
492
493 return (0);
494 }
495
496 /*
497 * rdc_init_diskq_header
498 * load initial values into the header
499 */
500 void
501 rdc_init_diskq_header(rdc_group_t *grp, dqheader *header)
502 {
503 int rc;
504 int type = 0;
505 disk_queue *q = &grp->diskq;
506
507 ASSERT(MUTEX_HELD(QLOCK(q)));
508
509 /* save q type if this is a failure */
510 if (QSTATE(q) & RDC_QNOBLOCK)
511 type = RDC_QNOBLOCK;
512 bzero(header, sizeof (*header));
513 header->h.magic = RDC_DISKQ_MAGIC;
514 header->h.vers = RDC_DISKQ_VERS;
515 header->h.state |= (RDC_SHUTDOWN_BAD|type); /* SHUTDOWN_OK on suspend */
516 header->h.head_offset = RDC_DISKQ_DATA_OFF;
517 header->h.tail_offset = RDC_DISKQ_DATA_OFF;
518 header->h.nitems = 0;
519 header->h.blocks = 0;
520 header->h.qwrap = 0;
521 SET_QNXTIO(q, QHEAD(q));
522 SET_QCOALBOUNDS(q, RDC_DISKQ_DATA_OFF);
523
524 /* do this last, as this might be a failure. get the kernel state ok */
525 rc = _rdc_rsrv_diskq(grp);
526 if (!RDC_SUCCESS(rc)) {
527 cmn_err(CE_WARN, "!init_diskq_hdr: Reserve failed for queue");
528 return;
529 }
530 (void) nsc_partsize(grp->diskqfd, &header->h.disk_size);
531 _rdc_rlse_diskq(grp);
532
533 }
534
535 /*
536 * rdc_unfail_diskq
537 * the diskq failed for some reason, lets try and re-start it
538 * the old stuff has already been thrown away
539 * should just be called from rdc_sync
540 */
541 void
542 rdc_unfail_diskq(rdc_k_info_t *krdc)
543 {
544 rdc_k_info_t *p;
545 rdc_u_info_t *q = &rdc_u_info[krdc->index];
546 rdc_group_t *group = krdc->group;
547 disk_queue *dq = &group->diskq;
548
549 rdc_group_enter(krdc);
550 rdc_clr_flags(q, RDC_ASYNC);
551 /* someone else won the race... */
552 if (!IS_STATE(q, RDC_DISKQ_FAILED)) {
553 rdc_group_exit(krdc);
554 return;
555 }
556 rdc_clr_flags(q, RDC_DISKQ_FAILED);
557 for (p = krdc->group_next; p != krdc; p = p->group_next) {
558 q = &rdc_u_info[p->index];
559 if (!IS_ENABLED(q))
560 continue;
561 rdc_clr_flags(q, RDC_DISKQ_FAILED);
562 rdc_clr_flags(q, RDC_ASYNC);
563 if (IS_STATE(q, RDC_QUEUING))
564 rdc_clr_flags(q, RDC_QUEUING);
565 }
566 rdc_group_exit(krdc);
567
568 mutex_enter(QLOCK(dq));
569
570 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
571 /* real i/o to the queue */
572 /* clear RDC_AUXSYNCIP because we cannot halt a sync that's not here */
573 krdc->aux_state &= ~RDC_AUXSYNCIP;
574 if (rdc_stamp_diskq(krdc, 0, RDC_GROUP_LOCKED | RDC_DOLOG) < 0) {
575 mutex_exit(QLOCK(dq));
576 goto fail;
577 }
578
579 SET_QNXTIO(dq, QHEAD(dq));
580 SET_QHDRCNT(dq, 0);
581 SET_QSTATE(dq, RDC_SHUTDOWN_BAD); /* only suspend can write good */
582 dq->iohdrs = NULL;
583 dq->hdr_last = NULL;
584
585 /* should be none, but.. */
586 rdc_dump_iohdrs(dq);
587
588 mutex_exit(QLOCK(dq));
589
590
591 fail:
592 krdc->aux_state |= RDC_AUXSYNCIP;
593 return;
594
595 }
596
597 int
598 rdc_read_diskq_header(rdc_k_info_t *krdc)
599 {
600 int rc;
601 diskq_header *header;
602 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
603
604 if (krdc->group->diskqfd == NULL) {
605 char buf[NSC_MAXPATH];
606 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
607 &urdc->secondary.intf[0]);
608 cmn_err(CE_WARN, "!Disk Queue Header read failed for %s",
609 urdc->group_name[0] == '\0' ? buf:
610 &urdc->group_name[0]);
611 return (-1);
612 }
613
614 header = &krdc->group->diskq.disk_hdr.h;
615 if (_rdc_rsrv_diskq(krdc->group)) {
616 return (-1);
617 }
618
619 rc = rdc_ns_io(krdc->group->diskqfd, NSC_RDBUF, 0,
620 (uchar_t *)header, sizeof (diskq_header));
621
622 _rdc_rlse_diskq(krdc->group);
623
624 if (!RDC_SUCCESS(rc)) {
625 char buf[NSC_MAXPATH];
626 (void) snprintf(buf, NSC_MAXPATH, "%s:%s", urdc->secondary.intf,
627 &urdc->secondary.file[0]);
628 cmn_err(CE_WARN, "!Disk Queue Header read failed(%d) for %s",
629 rc, urdc->group_name[0] == '\0' ? buf :
630 &urdc->group_name[0]);
631 return (-1);
632 }
633 return (0);
634 }
635
636 /*
637 * rdc_stop_diskq_flusher
638 */
639 void
640 rdc_stop_diskq_flusher(rdc_k_info_t *krdc)
641 {
642 disk_queue q, *qp;
643 rdc_group_t *group;
644 #ifdef DEBUG
645 cmn_err(CE_NOTE, "!stopping flusher threads");
646 #endif
647 group = krdc->group;
648 qp = &krdc->group->diskq;
649
650 /* save the queue info */
651 q = *qp;
652
653 /* lie a little */
654 SET_QTAIL(qp, RDC_DISKQ_DATA_OFF);
655 SET_QHEAD(qp, RDC_DISKQ_DATA_OFF);
656 SET_QSTATE(qp, RDC_QDISABLEPEND);
657 SET_QSTATE(qp, RDC_STOPPINGFLUSH);
658
659 /* drop locks to allow flushers to die */
660 mutex_exit(QLOCK(qp));
661 mutex_exit(QHEADLOCK(qp));
662 rdc_group_exit(krdc);
663
664 while (group->rdc_thrnum)
665 delay(2);
666
667 rdc_group_enter(krdc);
668 mutex_enter(QHEADLOCK(qp));
669 mutex_enter(QLOCK(qp));
670
671 CLR_QSTATE(qp, RDC_STOPPINGFLUSH);
672 *qp = q;
673 }
674
675 /*
676 * rdc_enable_diskq
677 * open the diskq
678 * and stamp the header onto it.
679 */
680 int
681 rdc_enable_diskq(rdc_k_info_t *krdc)
682 {
683 rdc_group_t *group;
684 disk_queue *q;
685
686 group = krdc->group;
687 q = &group->diskq;
688
689 if (rdc_open_diskq(krdc) < 0)
690 goto fail;
691
692 mutex_enter(QLOCK(q));
693 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
694
695 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
696 mutex_exit(QLOCK(q));
697 goto fail;
698 }
699
700 SET_QNXTIO(q, QHEAD(q));
701
702 mutex_exit(QLOCK(q));
703 return (0);
704
705 fail:
706 mutex_enter(&group->diskqmutex);
707 rdc_close_diskq(group);
708 mutex_exit(&group->diskqmutex);
709
710 /* caller has to fail diskq after dropping conf & many locks */
711 return (RDC_EQNOADD);
712 }
713
714 /*
715 * rdc_resume_diskq
716 * open the diskq and read the header
717 */
718 int
719 rdc_resume_diskq(rdc_k_info_t *krdc)
720 {
721 rdc_u_info_t *urdc;
722 rdc_group_t *group;
723 disk_queue *q;
724 int rc = 0;
725
726 urdc = &rdc_u_info[krdc->index];
727 group = krdc->group;
728 q = &group->diskq;
729
730 if (rdc_open_diskq(krdc) < 0) {
731 rc = RDC_EQNOADD;
732 goto fail;
733 }
734
735 mutex_enter(QLOCK(q));
736
737 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
738
739 if (rdc_read_diskq_header(krdc) < 0) {
740 SET_QSTATE(q, RDC_QBADRESUME);
741 rc = RDC_EQNOADD;
742 }
743
744 /* check diskq magic number */
745 if (QMAGIC(q) != RDC_DISKQ_MAGIC) {
746 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
747 " incorrect magic number in header", urdc->disk_queue);
748 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
749 SET_QSTATE(q, RDC_QBADRESUME);
750 rc = RDC_EQNOADD;
751 } else switch (QVERS(q)) {
752 diskq_header1 h1; /* version 1 header */
753 diskq_header *hc; /* current header */
754
755 #ifdef NSC_MULTI_TERABYTE
756 case RDC_DISKQ_VER_ORIG:
757 /* version 1 diskq header, upgrade to 64bit version */
758 h1 = *(diskq_header1 *)(&group->diskq.disk_hdr.h);
759 hc = &group->diskq.disk_hdr.h;
760
761 cmn_err(CE_WARN, "!SNDR: old version header for diskq %s,"
762 " upgrading to current version", urdc->disk_queue);
763 hc->vers = RDC_DISKQ_VERS;
764 hc->state = h1.state;
765 hc->head_offset = h1.head_offset;
766 hc->tail_offset = h1.tail_offset;
767 hc->disk_size = h1.disk_size;
768 hc->nitems = h1.nitems;
769 hc->blocks = h1.blocks;
770 hc->qwrap = h1.qwrap;
771 hc->auxqwrap = h1.auxqwrap;
772 hc->seq_last = h1.seq_last;
773 hc->ack_last = h1.ack_last;
774
775 if (hc->nitems > 0) {
776 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
777 " old version Q contains data", urdc->disk_queue);
778 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
779 SET_QSTATE(q, RDC_QBADRESUME);
780 rc = RDC_EQNOADD;
781 }
782 break;
783 #else
784 case RDC_DISKQ_VER_64BIT:
785 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
786 " diskq header newer than current version",
787 urdc->disk_queue);
788 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
789 SET_QSTATE(q, RDC_QBADRESUME);
790 rc = RDC_EQNOADD;
791 break;
792 #endif
793 case RDC_DISKQ_VERS:
794 /* okay, current version diskq */
795 break;
796 default:
797 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
798 " unknown diskq header version", urdc->disk_queue);
799 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
800 SET_QSTATE(q, RDC_QBADRESUME);
801 rc = RDC_EQNOADD;
802 break;
803 }
804 if (IS_QSTATE(q, RDC_SHUTDOWN_BAD)) {
805 cmn_err(CE_WARN, "!SNDR: unable to resume diskq %s,"
806 " unsafe shutdown", urdc->disk_queue);
807 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
808 SET_QSTATE(q, RDC_QBADRESUME);
809 rc = RDC_EQNOADD;
810 }
811
812 CLR_QSTATE(q, RDC_SHUTDOWN_OK);
813 SET_QSTATE(q, RDC_SHUTDOWN_BAD);
814
815 /* bad, until proven not bad */
816 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0) {
817 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_NOLOG);
818 rc = RDC_EQNOADD;
819 }
820
821 SET_QNXTIO(q, QHEAD(q));
822 group->diskq.nitems_hwm = QNITEMS(q);
823 group->diskq.blocks_hwm = QBLOCKS(q);
824
825 mutex_exit(QLOCK(q));
826
827 #ifdef DEBUG
828 cmn_err(CE_NOTE, "!rdc_resume_diskq: resuming diskq %s \n",
829 urdc->disk_queue);
830 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
831 #endif
832 if (rc == 0)
833 return (0);
834
835 fail:
836
837 /* caller has to set the diskq failed after dropping it's locks */
838 return (rc);
839
840 }
841
842 int
843 rdc_suspend_diskq(rdc_k_info_t *krdc)
844 {
845 int rc;
846 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
847 disk_queue *q;
848
849 q = &krdc->group->diskq;
850
851 /* grab both diskq locks as we are going to kill the flusher */
852 mutex_enter(QHEADLOCK(q));
853 mutex_enter(QLOCK(q));
854
855 if ((krdc->group->rdc_thrnum) && (!IS_QSTATE(q, RDC_STOPPINGFLUSH))) {
856 SET_QSTATE(q, RDC_STOPPINGFLUSH);
857 rdc_stop_diskq_flusher(krdc);
858 CLR_QSTATE(q, RDC_STOPPINGFLUSH);
859 }
860
861 krdc->group->diskq.disk_hdr.h.state &= ~RDC_SHUTDOWN_BAD;
862 krdc->group->diskq.disk_hdr.h.state |= RDC_SHUTDOWN_OK;
863 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QBADRESUME;
864
865 /* let's make sure that the flusher has stopped.. */
866 if (krdc->group->rdc_thrnum) {
867 mutex_exit(QLOCK(q));
868 mutex_exit(QHEADLOCK(q));
869 rdc_group_exit(krdc);
870
871 while (krdc->group->rdc_thrnum)
872 delay(5);
873
874 rdc_group_enter(krdc);
875 mutex_enter(QLOCK(q));
876 mutex_enter(QHEADLOCK(q));
877 }
878 /* write refcount to the bitmap */
879 if ((rc = rdc_write_refcount(krdc)) < 0) {
880 rdc_group_exit(krdc);
881 goto fail;
882 }
883
884 if (!QEMPTY(q)) {
885 rdc_set_flags(urdc, RDC_QUEUING);
886 } else {
887 rdc_clr_flags(urdc, RDC_QUEUING);
888 }
889
890 /* fill in diskq header info */
891 krdc->group->diskq.disk_hdr.h.state &= ~RDC_QDISABLEPEND;
892
893 #ifdef DEBUG
894 cmn_err(CE_NOTE, "!suspending disk queue\n" QDISPLAY(q));
895 #endif
896
897 /* to avoid a possible deadlock, release in order, and reacquire */
898 mutex_exit(QLOCK(q));
899 mutex_exit(QHEADLOCK(q));
900
901 if (krdc->group->count > 1) {
902 rdc_group_exit(krdc);
903 goto fail; /* just stamp on the last suspend */
904 }
905 rdc_group_exit(krdc); /* in case this stamp fails */
906 mutex_enter(QLOCK(q));
907
908 rc = rdc_stamp_diskq(krdc, 0, RDC_NOLOG);
909
910 mutex_exit(QLOCK(q));
911
912 fail:
913 rdc_group_enter(krdc);
914
915 /* diskq already failed if stamp failed */
916
917 return (rc);
918 }
919
920 /*
921 * copy orig aio to copy, including the nsc_buf_t
922 */
923 int
924 rdc_dup_aio(rdc_aio_t *orig, rdc_aio_t *copy)
925 {
926 int rc;
927 bcopy(orig, copy, sizeof (*orig));
928 copy->handle = NULL;
929
930 if (orig->handle == NULL) /* no buf to alloc/copy */
931 return (0);
932
933 rc = nsc_alloc_abuf(orig->pos, orig->len, 0, ©->handle);
934 if (!RDC_SUCCESS(rc)) {
935 #ifdef DEBUG
936 cmn_err(CE_WARN, "!rdc_dup_aio: alloc_buf failed (%d)", rc);
937 #endif
938 return (rc);
939 }
940 rc = nsc_copy(orig->handle, copy->handle, orig->pos,
941 orig->pos, orig->len);
942 if (!RDC_SUCCESS(rc)) {
943 (void) nsc_free_buf(copy->handle);
944 #ifdef DEBUG
945 cmn_err(CE_WARN, "!rdc_dup_aio: copy buf failed (%d)", rc);
946 #endif
947 return (rc);
948 }
949 return (0);
950 }
951
952 /*
953 * rdc_qfill_shldwakeup()
954 * 0 if the memory queue has filled, and the low water
955 * mark has not been reached. 0 if diskq is empty.
956 * 1 if less than low water mark
957 * net_queue mutex is already held
958 */
959 int
960 rdc_qfill_shldwakeup(rdc_k_info_t *krdc)
961 {
962 rdc_group_t *group = krdc->group;
963 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
964 net_queue *nq = &group->ra_queue;
965 disk_queue *dq = &group->diskq;
966
967 ASSERT(MUTEX_HELD(&nq->net_qlock));
968
969 if (!RDC_IS_DISKQ(krdc->group))
970 return (0);
971
972 if (nq->qfill_sleeping != RDC_QFILL_ASLEEP)
973 return (0);
974
975 if (nq->qfflags & RDC_QFILLSTOP)
976 return (1);
977
978 if (nq->qfflags & RDC_QFILLSLEEP)
979 return (0);
980
981 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING))
982 return (0);
983
984 mutex_enter(QLOCK(dq));
985 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
986 mutex_exit(QLOCK(dq));
987 return (0);
988 }
989 mutex_exit(QLOCK(dq));
990
991 if (nq->qfill_sleeping == RDC_QFILL_ASLEEP) {
992 if (nq->hwmhit) {
993 if (nq->blocks <= RDC_LOW_QBLOCKS) {
994 nq->hwmhit = 0;
995 } else {
996 return (0);
997 }
998 }
999 #ifdef DEBUG_DISKQ_NOISY
1000 cmn_err(CE_NOTE, "!Waking up diskq->memq flusher, flags 0x%x"
1001 " idx: %d", rdc_get_vflags(urdc), urdc->index);
1002 #endif
1003 return (1);
1004 }
1005 return (0);
1006
1007 }
1008
1009 /*
1010 * rdc_diskq_enqueue
1011 * enqueue one i/o to the diskq
1012 * after appending some metadata to the front
1013 */
1014 int
1015 rdc_diskq_enqueue(rdc_k_info_t *krdc, rdc_aio_t *aio)
1016 {
1017 nsc_vec_t *vec = NULL;
1018 nsc_buf_t *bp = NULL;
1019 nsc_buf_t *qbuf = NULL;
1020 io_hdr *iohdr = NULL;
1021 disk_queue *q;
1022 rdc_group_t *group;
1023 int numvecs;
1024 int i, j, rc = 0;
1025 int retries = 0;
1026 rdc_u_info_t *urdc;
1027 nsc_size_t iofbas; /* len of io + io header len */
1028 int qtail;
1029 int delay_time = 2;
1030 int print_msg = 1;
1031
1032 #ifdef DEBUG_WRITER_UBERNOISE
1033 int qhead;
1034 #endif
1035 urdc = &rdc_u_info[krdc->index];
1036 group = krdc->group;
1037 q = &group->diskq;
1038
1039 mutex_enter(QLOCK(q));
1040
1041 /*
1042 * there is a thread that is blocking because the queue is full,
1043 * don't try to set up this write until all is clear
1044 * check before and after for logging or failed queue just
1045 * in case a thread was in flight while the queue was full,
1046 * and in the proccess of failing
1047 */
1048 while (IS_QSTATE(q, RDC_QFULL)) {
1049 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1050 (IS_STATE(urdc, RDC_LOGGING) &&
1051 !IS_STATE(urdc, RDC_QUEUING))) {
1052 mutex_exit(QLOCK(q));
1053 if (aio->handle)
1054 (void) nsc_free_buf(aio->handle);
1055 return (-1);
1056 }
1057 cv_wait(&q->qfullcv, QLOCK(q));
1058
1059 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1060 (IS_STATE(urdc, RDC_LOGGING) &&
1061 !IS_STATE(urdc, RDC_QUEUING))) {
1062 mutex_exit(QLOCK(q));
1063 if (aio->handle)
1064 (void) nsc_free_buf(aio->handle);
1065 return (-1);
1066 }
1067
1068 }
1069
1070 SET_QSTATE(q, QTAILBUSY);
1071
1072 if (aio->handle == NULL) {
1073 /* we're only going to write the header to the queue */
1074 numvecs = 2; /* kmem_alloc io header + null terminate */
1075 iofbas = FBA_LEN(sizeof (io_hdr));
1076
1077 } else {
1078 /* find out how many vecs */
1079 numvecs = rdc_count_vecs(aio->handle->sb_vec) + 1;
1080 iofbas = aio->len + FBA_LEN(sizeof (io_hdr));
1081 }
1082
1083 /*
1084 * this, in conjunction with QTAILBUSY, will prevent
1085 * premature dequeuing
1086 */
1087
1088 SET_LASTQTAIL(q, QTAIL(q));
1089
1090 iohdr = (io_hdr *) kmem_zalloc(sizeof (io_hdr), KM_NOSLEEP);
1091 vec = (nsc_vec_t *) kmem_zalloc(sizeof (nsc_vec_t) * numvecs,
1092 KM_NOSLEEP);
1093
1094 if (!vec || !iohdr) {
1095 if (!vec) {
1096 cmn_err(CE_WARN, "!vec kmem alloc failed");
1097 } else {
1098 cmn_err(CE_WARN, "!iohdr kmem alloc failed");
1099 }
1100 if (vec)
1101 kmem_free(vec, sizeof (*vec));
1102 if (iohdr)
1103 kmem_free(iohdr, sizeof (*iohdr));
1104 CLR_QSTATE(q, QTAILBUSY);
1105 SET_LASTQTAIL(q, 0);
1106 mutex_exit(QLOCK(q));
1107 if (aio->handle)
1108 (void) nsc_free_buf(aio->handle);
1109 return (ENOMEM);
1110 }
1111
1112 vec[numvecs - 1].sv_len = 0;
1113 vec[numvecs - 1].sv_addr = 0;
1114
1115 /* now add the write itself */
1116 bp = aio->handle;
1117
1118 for (i = 1, j = 0; bp && bp->sb_vec[j].sv_addr &&
1119 i < numvecs; i++, j++) {
1120 vec[i].sv_len = bp->sb_vec[j].sv_len;
1121 vec[i].sv_addr = bp->sb_vec[j].sv_addr;
1122 }
1123
1124 retry:
1125
1126 /* check for queue wrap, then check for overflow */
1127 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
1128 (IS_STATE(urdc, RDC_LOGGING) && !IS_STATE(urdc, RDC_QUEUING))) {
1129 kmem_free(iohdr, sizeof (*iohdr));
1130 kmem_free(vec, sizeof (*vec) * numvecs);
1131 CLR_QSTATE(q, QTAILBUSY);
1132 SET_LASTQTAIL(q, 0);
1133 if (IS_QSTATE(q, RDC_QFULL)) { /* wakeup blocked threads */
1134 CLR_QSTATE(q, RDC_QFULL);
1135 cv_broadcast(&q->qfullcv);
1136 }
1137 mutex_exit(QLOCK(q));
1138 if (aio->handle)
1139 (void) nsc_free_buf(aio->handle);
1140
1141 return (-1);
1142 }
1143
1144 if (QTAILSHLDWRAP(q, iofbas)) {
1145 /*
1146 * just go back to the beginning of the disk
1147 * it's not worth the trouble breaking up the write
1148 */
1149 #ifdef DEBUG_DISKQWRAP
1150 cmn_err(CE_NOTE, "!wrapping Q tail: " QDISPLAY(q));
1151 #endif
1152 /*LINTED*/
1153 WRAPQTAIL(q);
1154 }
1155
1156 /*
1157 * prepend the write's metadata
1158 */
1159 rdc_fill_ioheader(aio, iohdr, QTAIL(q));
1160
1161 vec[0].sv_len = FBA_SIZE(1);
1162 vec[0].sv_addr = (uchar_t *)iohdr;
1163
1164 /* check for tail < head */
1165
1166 if (!(FITSONQ(q, iofbas))) {
1167 /*
1168 * don't allow any more writes to start
1169 */
1170 SET_QSTATE(q, RDC_QFULL);
1171 mutex_exit(QLOCK(q));
1172
1173 if ((!group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1174 (void) rdc_writer(krdc->index);
1175
1176 delay(delay_time);
1177 q->throttle_delay += delay_time;
1178 retries++;
1179 delay_time *= 2; /* fairly aggressive */
1180 if ((retries >= 8) || (delay_time >= 256)) {
1181 delay_time = 2;
1182 if (print_msg) {
1183 cmn_err(CE_WARN, "!enqueue: disk queue %s full",
1184 &urdc->disk_queue[0]);
1185 print_msg = 0;
1186 #ifdef DEBUG
1187 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
1188 #else
1189 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1190 #endif
1191 }
1192 /*
1193 * if this is a no-block queue, or this is a blocking
1194 * queue that is not flushing. reset and log
1195 */
1196 if ((QSTATE(q) & RDC_QNOBLOCK) ||
1197 (IS_STATE(urdc, RDC_QUEUING))) {
1198
1199 if (IS_STATE(urdc, RDC_QUEUING)) {
1200 cmn_err(CE_WARN, "!SNDR: disk queue %s full and not flushing. "
1201 "giving up", &urdc->disk_queue[0]);
1202 cmn_err(CE_WARN, "!SNDR: %s:%s entering logging mode",
1203 urdc->secondary.intf, urdc->secondary.file);
1204 }
1205
1206 rdc_fail_diskq(krdc, RDC_WAIT,
1207 RDC_DOLOG | RDC_NOFAIL);
1208 kmem_free(iohdr, sizeof (*iohdr));
1209 kmem_free(vec, sizeof (*vec) * numvecs);
1210 mutex_enter(QLOCK(q));
1211 CLR_QSTATE(q, QTAILBUSY | RDC_QFULL);
1212 cv_broadcast(&q->qfullcv);
1213 mutex_exit(QLOCK(q));
1214 SET_LASTQTAIL(q, 0);
1215 if (aio->handle)
1216 (void) nsc_free_buf(aio->handle);
1217 return (ENOMEM);
1218 }
1219 }
1220
1221 mutex_enter(QLOCK(q));
1222 goto retry;
1223
1224 }
1225
1226 qtail = QTAIL(q);
1227 #ifdef DEBUG_WRITER_UBERNOISE
1228 qhead = QHEAD(q);
1229 #endif
1230
1231 /* update tail pointer, nitems on queue and blocks on queue */
1232 INC_QTAIL(q, iofbas); /* increment tail over i/o size + ioheader size */
1233 INC_QNITEMS(q, 1);
1234 /* increment counter for i/o blocks only */
1235 INC_QBLOCKS(q, (iofbas - FBA_LEN(sizeof (io_hdr))));
1236
1237 if (QNITEMS(q) > q->nitems_hwm)
1238 q->nitems_hwm = QNITEMS(q);
1239 if (QBLOCKS(q) > q->blocks_hwm)
1240 q->blocks_hwm = QBLOCKS(q);
1241
1242 if (IS_QSTATE(q, RDC_QFULL)) {
1243 CLR_QSTATE(q, RDC_QFULL);
1244 cv_broadcast(&q->qfullcv);
1245 }
1246
1247 mutex_exit(QLOCK(q));
1248
1249 /*
1250 * if (krdc->io_kstats) {
1251 * mutex_enter(krdc->io_kstats->ks_lock);
1252 * kstat_waitq_enter(KSTAT_IO_PTR(krdc->io_kstats));
1253 * mutex_exit(krdc->io_kstats->ks_lock);
1254 * }
1255 */
1256
1257 DTRACE_PROBE(rdc_diskq_rsrv);
1258
1259 if (_rdc_rsrv_diskq(group)) {
1260 cmn_err(CE_WARN, "!rdc_enqueue: %s reserve failed",
1261 &urdc->disk_queue[0]);
1262 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1263 kmem_free(iohdr, sizeof (*iohdr));
1264 kmem_free(vec, sizeof (*vec) * numvecs);
1265 mutex_enter(QLOCK(q));
1266 CLR_QSTATE(q, QTAILBUSY);
1267 SET_LASTQTAIL(q, 0);
1268 mutex_exit(QLOCK(q));
1269 if (aio->handle)
1270 (void) nsc_free_buf(aio->handle);
1271 return (-1);
1272 }
1273
1274 /* XXX for now do this, but later pre-alloc handle in enable/resume */
1275
1276 DTRACE_PROBE(rdc_diskq_alloc_start);
1277 rc = nsc_alloc_buf(group->diskqfd, qtail, iofbas,
1278 NSC_NOCACHE | NSC_WRITE | NSC_NODATA, &qbuf);
1279
1280 DTRACE_PROBE(rdc_diskq_alloc_end);
1281
1282 if (!RDC_SUCCESS(rc)) {
1283 cmn_err(CE_WARN, "!disk queue %s alloc failed(%d) %" NSC_SZFMT,
1284 &urdc->disk_queue[0], rc, iofbas);
1285 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1286 rc = ENOMEM;
1287 goto fail;
1288 }
1289 /* move vec and write to queue */
1290 qbuf->sb_vec = &vec[0];
1291
1292 #ifdef DEBUG_WRITER_UBERNOISE
1293
1294 cmn_err(CE_NOTE, "!about to write to queue, qbuf: %p, qhead: %d, "
1295 "qtail: %d, len: %d contents: %c%c%c%c%c",
1296 (void *) qbuf, qhead, qtail, iofbas,
1297 qbuf->sb_vec[1].sv_addr[0],
1298 qbuf->sb_vec[1].sv_addr[1],
1299 qbuf->sb_vec[1].sv_addr[2],
1300 qbuf->sb_vec[1].sv_addr[3],
1301 qbuf->sb_vec[1].sv_addr[4]);
1302 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(q));
1303
1304 #endif
1305
1306 DTRACE_PROBE2(rdc_diskq_nswrite_start, int, qtail, nsc_size_t, iofbas);
1307 rc = nsc_write(qbuf, qtail, iofbas, 0);
1308 DTRACE_PROBE2(rdc_diskq_nswrite_end, int, qtail, nsc_size_t, iofbas);
1309
1310 if (!RDC_SUCCESS(rc)) {
1311 cmn_err(CE_WARN, "!disk queue %s write failed %d",
1312 &urdc->disk_queue[0], rc);
1313 rdc_fail_diskq(krdc, RDC_WAIT, RDC_DOLOG);
1314 goto fail;
1315
1316 }
1317
1318 mutex_enter(QLOCK(q));
1319
1320 SET_LASTQTAIL(q, 0);
1321 CLR_QSTATE(q, QTAILBUSY);
1322
1323 mutex_exit(QLOCK(q));
1324
1325 fail:
1326
1327 /*
1328 * return what should be returned
1329 * the aio is returned in _rdc_write after status is gathered.
1330 */
1331
1332 if (qbuf)
1333 qbuf->sb_vec = 0;
1334 (void) nsc_free_buf(qbuf);
1335
1336 if (aio->handle)
1337 (void) nsc_free_buf(aio->handle);
1338
1339 _rdc_rlse_diskq(group);
1340 DTRACE_PROBE(rdc_diskq_rlse);
1341
1342 /* free the iohdr and the vecs */
1343
1344 if (iohdr)
1345 kmem_free(iohdr, sizeof (*iohdr));
1346 if (vec)
1347 kmem_free(vec, sizeof (*vec) * numvecs);
1348
1349 /* if no flusher running, start one */
1350 if ((!krdc->group->rdc_writer) && !IS_STATE(urdc, RDC_LOGGING))
1351 (void) rdc_writer(krdc->index);
1352
1353 return (rc);
1354 }
1355
1356 /*
1357 * place this on the pending list of io_hdr's out for flushing
1358 */
1359 void
1360 rdc_add_iohdr(io_hdr *header, rdc_group_t *group)
1361 {
1362 disk_queue *q = NULL;
1363 #ifdef DEBUG
1364 io_hdr *p;
1365 #endif
1366
1367 q = &group->diskq;
1368
1369 /* paranoia */
1370 header->dat.next = NULL;
1371
1372 mutex_enter(QLOCK(q));
1373 #ifdef DEBUG /* AAAH! double flush!? */
1374 p = q->iohdrs;
1375 while (p) {
1376 if (p->dat.qpos == header->dat.qpos) {
1377 cmn_err(CE_WARN, "!ADDING DUPLICATE HEADER %" NSC_SZFMT,
1378 p->dat.qpos);
1379 kmem_free(header, sizeof (*header));
1380 mutex_exit(QLOCK(q));
1381 return;
1382 }
1383 p = p->dat.next;
1384 }
1385 #endif
1386 if (q->iohdrs == NULL) {
1387 q->iohdrs = q->hdr_last = header;
1388 q->hdrcnt = 1;
1389 mutex_exit(QLOCK(q));
1390 return;
1391 }
1392
1393 q->hdr_last->dat.next = header;
1394 q->hdr_last = header;
1395 q->hdrcnt++;
1396 mutex_exit(QLOCK(q));
1397 return;
1398
1399 }
1400
1401 /*
1402 * mark an io header as flushed. If it is the qhead,
1403 * then update the qpointers
1404 * free the io_hdrs
1405 * called after the bitmap is cleared by flusher
1406 */
1407 void
1408 rdc_clr_iohdr(rdc_k_info_t *krdc, nsc_size_t qpos)
1409 {
1410 rdc_group_t *group = krdc->group;
1411 disk_queue *q = NULL;
1412 io_hdr *hp = NULL;
1413 io_hdr *p = NULL;
1414 int found = 0;
1415 int cnt = 0;
1416
1417 #ifndef NSC_MULTI_TERABYTE
1418 ASSERT(qpos >= 0); /* assertion to validate change for 64bit */
1419 if (qpos < 0) /* not a diskq offset */
1420 return;
1421 #endif
1422
1423 q = &group->diskq;
1424 mutex_enter(QLOCK(q));
1425
1426 hp = p = q->iohdrs;
1427
1428 /* find outstanding io_hdr */
1429 while (hp) {
1430 if (hp->dat.qpos == qpos) {
1431 found++;
1432 break;
1433 }
1434 cnt++;
1435 p = hp;
1436 hp = hp->dat.next;
1437 }
1438
1439 if (!found) {
1440 if (RDC_BETWEEN(QHEAD(q), QNXTIO(q), qpos)) {
1441 #ifdef DEBUG
1442 cmn_err(CE_WARN, "!iohdr already cleared? "
1443 "qpos %" NSC_SZFMT " cnt %d ", qpos, cnt);
1444 cmn_err(CE_WARN, "!Qinfo: " QDISPLAY(q));
1445 #endif
1446 mutex_exit(QLOCK(q));
1447 return;
1448 }
1449 mutex_exit(QLOCK(q));
1450 return;
1451 }
1452
1453 /* mark it as flushed */
1454 hp->dat.iostatus = RDC_IOHDR_DONE;
1455
1456 /*
1457 * if it is the head pointer, travel the list updating the queue
1458 * pointers until the next unflushed is reached, freeing on the way.
1459 */
1460 while (hp && (hp->dat.qpos == QHEAD(q)) &&
1461 (hp->dat.iostatus == RDC_IOHDR_DONE)) {
1462 #ifdef DEBUG_FLUSHER_UBERNOISE
1463 cmn_err(CE_NOTE, "!clr_iohdr info: magic %x type %d pos %d"
1464 " qpos %d hpos %d len %d flag 0x%x iostatus %x setid %d",
1465 hp->dat.magic, hp->dat.type, hp->dat.pos, hp->dat.qpos,
1466 hp->dat.hpos, hp->dat.len, hp->dat.flag,
1467 hp->dat.iostatus, hp->dat.setid);
1468 #endif
1469 if (hp->dat.flag & RDC_NULL_BUF) {
1470 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)));
1471 } else {
1472 INC_QHEAD(q, FBA_LEN(sizeof (io_hdr)) + hp->dat.len);
1473 DEC_QBLOCKS(q, hp->dat.len);
1474 }
1475
1476 DEC_QNITEMS(q, 1);
1477
1478 if (QHEADSHLDWRAP(q)) { /* simple enough */
1479 #ifdef DEBUG_DISKQWRAP
1480 cmn_err(CE_NOTE, "!wrapping Q head: " QDISPLAY(q));
1481 #endif
1482 /*LINTED*/
1483 WRAPQHEAD(q);
1484 }
1485
1486 /* get rid of the iohdr */
1487 if (hp == q->iohdrs) {
1488 q->iohdrs = hp->dat.next;
1489 kmem_free(hp, sizeof (*hp));
1490 hp = q->iohdrs;
1491 } else {
1492 if (hp == q->hdr_last)
1493 q->hdr_last = p;
1494 p->dat.next = hp->dat.next;
1495 kmem_free(hp, sizeof (*hp));
1496 hp = p->dat.next;
1497 }
1498 q->hdrcnt--;
1499 }
1500
1501 if (QEMPTY(q) && !IS_QSTATE(q, RDC_QFULL) &&
1502 !(IS_QSTATE(q, RDC_QDISABLEPEND))) {
1503 #ifdef DEBUG_FLUSHER_UBERNOISE
1504 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
1505 cmn_err(CE_NOTE, "!clr_iohdr: diskq %s empty, "
1506 "resetting defaults", urdc->disk_queue);
1507 #endif
1508
1509 rdc_init_diskq_header(group, &q->disk_hdr);
1510 SET_QNXTIO(q, QHEAD(q));
1511 }
1512
1513 /* wakeup any blocked enqueue threads */
1514 cv_broadcast(&q->qfullcv);
1515 mutex_exit(QLOCK(q));
1516 }
1517
1518 /*
1519 * put in whatever useful checks we can on the io header
1520 */
1521 int
1522 rdc_iohdr_ok(io_hdr *hdr)
1523 {
1524 if (hdr->dat.magic != RDC_IOHDR_MAGIC)
1525 goto bad;
1526 return (1);
1527 bad:
1528
1529 #ifdef DEBUG
1530 cmn_err(CE_WARN, "!Bad io header magic %x type %d pos %" NSC_SZFMT
1531 " hpos %" NSC_SZFMT " qpos %" NSC_SZFMT " len %" NSC_SZFMT
1532 " flag %d iostatus %d setid %d", hdr->dat.magic,
1533 hdr->dat.type, hdr->dat.pos, hdr->dat.hpos, hdr->dat.qpos,
1534 hdr->dat.len, hdr->dat.flag, hdr->dat.iostatus, hdr->dat.setid);
1535 #else
1536 cmn_err(CE_WARN, "!Bad io header retrieved");
1537 #endif
1538 return (0);
1539 }
1540
1541 /*
1542 * rdc_netqueue_insert()
1543 * add an item to a netqueue. No locks necessary as it should only
1544 * be used in a single threaded manor. If that changes, then
1545 * a lock or assertion should be done here
1546 */
1547 void
1548 rdc_netqueue_insert(rdc_aio_t *aio, net_queue *q)
1549 {
1550 rdc_k_info_t *krdc = &rdc_k_info[aio->index];
1551
1552 /* paranoid check for bit set */
1553 RDC_CHECK_BIT(krdc, aio->pos, aio->len);
1554
1555 if (q->net_qhead == NULL) {
1556 q->net_qhead = q->net_qtail = aio;
1557
1558 } else {
1559 q->net_qtail->next = aio;
1560 q->net_qtail = aio;
1561 }
1562 q->blocks += aio->len;
1563 q->nitems++;
1564
1565 if (q->nitems > q->nitems_hwm) {
1566 q->nitems_hwm = q->nitems;
1567 }
1568 if (q->blocks > q->blocks_hwm) {
1569 q->nitems_hwm = q->blocks;
1570 }
1571 }
1572
1573 /*
1574 * rdc_fill_aio(aio, hdr)
1575 * take the pertinent info from an io_hdr and stick it in
1576 * an aio, including seq number, abuf.
1577 */
1578 void
1579 rdc_fill_aio(rdc_group_t *grp, rdc_aio_t *aio, io_hdr *hdr, nsc_buf_t *abuf)
1580 {
1581 if (hdr->dat.flag & RDC_NULL_BUF) {
1582 aio->handle = NULL;
1583 } else {
1584 aio->handle = abuf;
1585 }
1586 aio->qhandle = abuf;
1587 aio->pos = hdr->dat.pos;
1588 aio->qpos = hdr->dat.qpos;
1589 aio->len = hdr->dat.len;
1590 aio->flag = hdr->dat.flag;
1591 if ((aio->index = rdc_setid2idx(hdr->dat.setid)) < 0)
1592 return;
1593 mutex_enter(&grp->diskq.disk_qlock);
1594 if (grp->ra_queue.qfflags & RDC_QFILLSLEEP) {
1595 mutex_exit(&grp->diskq.disk_qlock);
1596 aio->seq = RDC_NOSEQ;
1597 return;
1598 }
1599 if (abuf && aio->qhandle) {
1600 abuf->sb_user++;
1601 }
1602 aio->seq = grp->seq++;
1603 if (grp->seq < aio->seq)
1604 grp->seq = RDC_NEWSEQ + 1;
1605 mutex_exit(&grp->diskq.disk_qlock);
1606 hdr->dat.iostatus = aio->seq;
1607
1608 }
1609
1610 #ifdef DEBUG
1611 int maxaios_perbuf = 0;
1612 int midaios_perbuf = 0;
1613 int aveaios_perbuf = 0;
1614 int totaios_perbuf = 0;
1615 int buf2qcalls = 0;
1616
1617 void
1618 calc_perbuf(int items)
1619 {
1620 if (totaios_perbuf < 0) {
1621 maxaios_perbuf = 0;
1622 midaios_perbuf = 0;
1623 aveaios_perbuf = 0;
1624 totaios_perbuf = 0;
1625 buf2qcalls = 0;
1626 }
1627
1628 if (items > maxaios_perbuf)
1629 maxaios_perbuf = items;
1630 midaios_perbuf = maxaios_perbuf / 2;
1631 totaios_perbuf += items;
1632 aveaios_perbuf = totaios_perbuf / buf2qcalls;
1633 }
1634 #endif
1635
1636 /*
1637 * rdc_discard_tmpq()
1638 * free up the passed temporary queue
1639 * NOTE: no cv's or mutexes have been initialized
1640 */
1641 void
1642 rdc_discard_tmpq(net_queue *q)
1643 {
1644 rdc_aio_t *aio;
1645
1646 if (q == NULL)
1647 return;
1648
1649 while (q->net_qhead) {
1650 aio = q->net_qhead;
1651 q->net_qhead = q->net_qhead->next;
1652 if (aio->qhandle) {
1653 aio->qhandle->sb_user--;
1654 if (aio->qhandle->sb_user == 0) {
1655 rdc_fixlen(aio);
1656 (void) nsc_free_buf(aio->qhandle);
1657 }
1658 }
1659 kmem_free(aio, sizeof (*aio));
1660 q->nitems--;
1661 }
1662 kmem_free(q, sizeof (*q));
1663
1664 }
1665
1666 /*
1667 * rdc_diskq_buf2queue()
1668 * take a chunk of the diskq, parse it and assemble
1669 * a chain of rdc_aio_t's.
1670 * updates QNXTIO()
1671 */
1672 net_queue *
1673 rdc_diskq_buf2queue(rdc_group_t *grp, nsc_buf_t **abuf, int index)
1674 {
1675 rdc_aio_t *aio = NULL;
1676 nsc_vec_t *vecp = NULL;
1677 uchar_t *vaddr = NULL;
1678 uchar_t *ioaddr = NULL;
1679 net_queue *netq = NULL;
1680 io_hdr *hdr = NULL;
1681 nsc_buf_t *buf = *abuf;
1682 rdc_u_info_t *urdc = &rdc_u_info[index];
1683 rdc_k_info_t *krdc = &rdc_k_info[index];
1684 disk_queue *dq = &grp->diskq;
1685 net_queue *nq = &grp->ra_queue;
1686 int nullbuf = 0;
1687 nsc_off_t endobuf;
1688 nsc_off_t bufoff;
1689 int vlen;
1690 nsc_off_t fpos;
1691 long bufcnt = 0;
1692 int nullblocks = 0;
1693 int fail = 1;
1694
1695 if (buf == NULL)
1696 return (NULL);
1697
1698 netq = kmem_zalloc(sizeof (*netq), KM_NOSLEEP);
1699 if (netq == NULL) {
1700 cmn_err(CE_WARN, "!SNDR: unable to allocate net queue");
1701 return (NULL);
1702 }
1703
1704 vecp = buf->sb_vec;
1705 vlen = vecp->sv_len;
1706 vaddr = vecp->sv_addr;
1707 bufoff = buf->sb_pos;
1708 endobuf = bufoff + buf->sb_len;
1709
1710 #ifdef DEBUG_FLUSHER_UBERNOISE
1711 cmn_err(CE_WARN, "!BUFFOFFENTER %d", bufoff);
1712 #endif
1713 /* CONSTCOND */
1714 while (1) {
1715 if (IS_STATE(urdc, RDC_LOGGING) ||
1716 (nq->qfflags & RDC_QFILLSLEEP)) {
1717 fail = 0;
1718 goto fail;
1719 }
1720 #ifdef DEBUG_FLUSHER_UBERNOISE
1721 cmn_err(CE_WARN, "!BUFFOFF_0 %d", bufoff);
1722 #endif
1723
1724 if ((vaddr == NULL) || (vlen == 0))
1725 break;
1726
1727 if (vlen <= 0) {
1728 vecp++;
1729 vaddr = vecp->sv_addr;
1730 vlen = vecp->sv_len;
1731 if (vaddr == NULL)
1732 break;
1733 }
1734
1735 /* get the iohdr information */
1736
1737 hdr = kmem_zalloc(sizeof (*hdr), KM_NOSLEEP);
1738 if (hdr == NULL) {
1739 cmn_err(CE_WARN,
1740 "!SNDR: unable to alocate net queue header");
1741 goto fail;
1742 }
1743
1744 ioaddr = (uchar_t *)hdr;
1745
1746 bcopy(vaddr, ioaddr, sizeof (*hdr));
1747
1748 if (!rdc_iohdr_ok(hdr)) {
1749 cmn_err(CE_WARN,
1750 "!unable to retrieve i/o data from queue %s "
1751 "at offset %" NSC_SZFMT " bp: %" NSC_SZFMT " bl: %"
1752 NSC_SZFMT, urdc->disk_queue,
1753 bufoff, buf->sb_pos, buf->sb_len);
1754 #ifdef DEBUG_DISKQ
1755 cmn_err(CE_WARN, "!FAILING QUEUE state: %x",
1756 rdc_get_vflags(urdc));
1757 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(dq));
1758 cmn_err(CE_WARN, "!VADDR %p, IOADDR %p", vaddr, ioaddr);
1759 cmn_err(CE_WARN, "!BUF %p", buf);
1760 #endif
1761 cmn_err(CE_WARN, "!qinfo: " QDISPLAYND(dq));
1762
1763 goto fail;
1764 }
1765
1766 nullbuf = hdr->dat.flag & RDC_NULL_BUF;
1767
1768 bufoff += FBA_NUM(sizeof (*hdr));
1769
1770 /* out of buffer, set nxtio to re read this last hdr */
1771 if (!nullbuf && ((bufoff + hdr->dat.len) > endobuf)) {
1772 break;
1773 }
1774
1775 bufcnt += FBA_NUM(sizeof (*hdr));
1776
1777 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1778 if (aio == NULL) {
1779 bufcnt -= FBA_NUM(sizeof (*hdr));
1780 cmn_err(CE_WARN, "!SNDR: net queue aio alloc failed");
1781 goto fail;
1782 }
1783
1784 if (!nullbuf) {
1785 /* move to next iohdr in big buf */
1786 bufoff += hdr->dat.len;
1787 bufcnt += hdr->dat.len;
1788 }
1789
1790 rdc_fill_aio(grp, aio, hdr, buf);
1791
1792 if (aio->index < 0) {
1793 cmn_err(CE_WARN, "!Set id %d not found or no longer "
1794 "enabled, failing disk queue", hdr->dat.setid);
1795 kmem_free(aio, sizeof (*aio));
1796 goto fail;
1797 }
1798 if (aio->seq == RDC_NOSEQ) {
1799 kmem_free(aio, sizeof (*aio));
1800 fail = 0;
1801 goto fail;
1802 }
1803 if (aio->handle == NULL)
1804 nullblocks += aio->len;
1805
1806 rdc_add_iohdr(hdr, grp);
1807 hdr = NULL; /* don't accidentally free on break or fail */
1808 rdc_netqueue_insert(aio, netq);
1809
1810 /* no more buffer, skip the below logic */
1811 if ((bufoff + FBA_NUM(sizeof (*hdr))) >= endobuf) {
1812 break;
1813 }
1814
1815 fpos = bufoff - buf->sb_pos;
1816 vecp = buf->sb_vec;
1817 for (; fpos >= FBA_NUM(vecp->sv_len); vecp++)
1818 fpos -= FBA_NUM(vecp->sv_len);
1819 vlen = vecp->sv_len - FBA_SIZE(fpos);
1820 vaddr = vecp->sv_addr + FBA_SIZE(fpos);
1821 /* abuf = NULL; */
1822
1823 }
1824
1825 /* free extraneous header */
1826 if (hdr) {
1827 kmem_free(hdr, sizeof (*hdr));
1828 hdr = NULL;
1829 }
1830
1831 /*
1832 * probably won't happen, but if we didn't goto fail, but
1833 * we don't contain anything meaningful.. return NULL
1834 * and let the flusher or the sleep/wakeup routines
1835 * decide
1836 */
1837 if (netq && netq->nitems == 0) {
1838 kmem_free(netq, sizeof (*netq));
1839 return (NULL);
1840 }
1841
1842 #ifdef DEBUG
1843 buf2qcalls++;
1844 calc_perbuf(netq->nitems);
1845 #endif
1846 if (IS_STATE(urdc, RDC_LOGGING) ||
1847 nq->qfflags & RDC_QFILLSLEEP) {
1848 fail = 0;
1849 goto fail;
1850 }
1851
1852 mutex_enter(QLOCK(dq));
1853 INC_QNXTIO(dq, bufcnt);
1854 mutex_exit(QLOCK(dq));
1855
1856 netq->net_qtail->orig_len = nullblocks; /* overload */
1857
1858 return (netq);
1859
1860 fail:
1861
1862 if (hdr) {
1863 kmem_free(hdr, sizeof (*hdr));
1864 }
1865
1866 if (netq) {
1867 if (netq->nitems > 0) {
1868 /* the never can happen case ... */
1869 if ((netq->nitems == 1) &&
1870 (netq->net_qhead->handle == NULL)) {
1871 (void) nsc_free_buf(buf);
1872 *abuf = NULL;
1873 }
1874
1875 }
1876 rdc_discard_tmpq(netq);
1877 }
1878
1879 mutex_enter(QLOCK(dq));
1880 rdc_dump_iohdrs(dq);
1881 mutex_exit(QLOCK(dq));
1882
1883 if (fail) { /* real failure, not just state change */
1884 #ifdef DEBUG
1885 cmn_err(CE_WARN, "!rdc_diskq_buf2queue: failing disk queue %s",
1886 urdc->disk_queue);
1887 #endif
1888 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
1889 }
1890
1891 return (NULL);
1892
1893 }
1894
1895 /*
1896 * rdc_diskq_unqueue
1897 * remove one chunk from the diskq belonging to
1898 * rdc_k_info[index]
1899 * updates the head and tail pointers in the disk header
1900 * but does not write. The header should be written on ack
1901 * flusher should free whatever..
1902 */
1903 rdc_aio_t *
1904 rdc_diskq_unqueue(int index)
1905 {
1906 int rc, rc1, rc2;
1907 nsc_off_t qhead;
1908 int nullhandle = 0;
1909 io_hdr *iohdr;
1910 rdc_aio_t *aio = NULL;
1911 nsc_buf_t *buf = NULL;
1912 nsc_buf_t *abuf = NULL;
1913 rdc_group_t *group = NULL;
1914 disk_queue *q = NULL;
1915 rdc_k_info_t *krdc = &rdc_k_info[index];
1916 rdc_u_info_t *urdc = &rdc_u_info[index];
1917
1918 group = krdc->group;
1919 q = &group->diskq;
1920
1921 if (group->diskqfd == NULL) /* we've been disabled */
1922 return (NULL);
1923
1924 aio = kmem_zalloc(sizeof (*aio), KM_NOSLEEP);
1925 if (!aio) {
1926 return (NULL);
1927 }
1928
1929 iohdr = kmem_zalloc(sizeof (*iohdr), KM_NOSLEEP);
1930 if (!iohdr) {
1931 kmem_free(aio, sizeof (*aio));
1932 return (NULL);
1933 }
1934
1935 mutex_enter(QLOCK(q));
1936 rdc_set_qbusy(q); /* make sure no one disables the queue */
1937 mutex_exit(QLOCK(q));
1938
1939 DTRACE_PROBE(rdc_diskq_unq_rsrv);
1940
1941 if (_rdc_rsrv_diskq(group)) {
1942 cmn_err(CE_WARN, "!rdc_unqueue: %s reserve failed",
1943 urdc->disk_queue);
1944 goto fail;
1945 }
1946
1947 mutex_enter(QHEADLOCK(q));
1948 mutex_enter(QLOCK(q));
1949
1950 if (IS_STATE(urdc, RDC_DISKQ_FAILED) || IS_STATE(urdc, RDC_LOGGING)) {
1951 rdc_clr_qbusy(q);
1952 mutex_exit(QLOCK(q));
1953 mutex_exit(QHEADLOCK(q));
1954 kmem_free(aio, sizeof (*aio));
1955 kmem_free(iohdr, sizeof (*iohdr));
1956 return (NULL);
1957 }
1958
1959 if (QNXTIOSHLDWRAP(q)) {
1960 #ifdef DEBUG_DISKQWRAP
1961 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(q));
1962 #endif
1963 /*LINTED*/
1964 WRAPQNXTIO(q);
1965 }
1966
1967 /* read the metainfo at q->nxt_io first */
1968 if (QNXTIO(q) == QTAIL(q)) { /* empty */
1969
1970 _rdc_rlse_diskq(group);
1971 if (q->lastio->handle)
1972 (void) nsc_free_buf(q->lastio->handle);
1973 bzero(&(*q->lastio), sizeof (*q->lastio));
1974
1975 mutex_exit(QHEADLOCK(q));
1976 rdc_clr_qbusy(q);
1977 mutex_exit(QLOCK(q));
1978 kmem_free(aio, sizeof (*aio));
1979 kmem_free(iohdr, sizeof (*iohdr));
1980 return (NULL);
1981 }
1982
1983 qhead = QNXTIO(q);
1984
1985 /*
1986 * have to drop the lock here, sigh. Cannot block incoming io
1987 * we have to wait until after this read to find out how
1988 * much to increment QNXTIO. Might as well grab the seq then too
1989 */
1990
1991 while ((qhead == LASTQTAIL(q)) && (IS_QSTATE(q, QTAILBUSY))) {
1992 mutex_exit(QLOCK(q));
1993 #ifdef DEBUG_DISKQ
1994 cmn_err(CE_NOTE, "!Qtail busy delay lastqtail: %d", qhead);
1995 #endif
1996 delay(5);
1997 mutex_enter(QLOCK(q));
1998 }
1999 mutex_exit(QLOCK(q));
2000
2001 DTRACE_PROBE(rdc_diskq_iohdr_read_start);
2002
2003 rc = rdc_ns_io(group->diskqfd, NSC_READ, qhead,
2004 (uchar_t *)iohdr, FBA_SIZE(1));
2005
2006 DTRACE_PROBE(rdc_diskq_iohdr_read_end);
2007
2008 if (!RDC_SUCCESS(rc) || !rdc_iohdr_ok(iohdr)) {
2009 cmn_err(CE_WARN, "!unable to retrieve i/o data from queue %s"
2010 " at offset %" NSC_SZFMT " rc %d", urdc->disk_queue,
2011 qhead, rc);
2012 #ifdef DEBUG_DISKQ
2013 cmn_err(CE_WARN, "!qinfo: " QDISPLAY(q));
2014 #endif
2015 mutex_exit(QHEADLOCK(q));
2016 goto fail;
2017 }
2018
2019 /* XXX process buffer here, creating rdc_aio_t's */
2020
2021 mutex_enter(QLOCK(q));
2022 /* update the next pointer */
2023 if (iohdr->dat.flag == RDC_NULL_BUF) {
2024 INC_QNXTIO(q, FBA_LEN(sizeof (io_hdr)));
2025 nullhandle = 1;
2026 } else {
2027 INC_QNXTIO(q, (FBA_LEN(sizeof (io_hdr)) + iohdr->dat.len));
2028 }
2029
2030 aio->seq = group->seq++;
2031 if (group->seq < aio->seq)
2032 group->seq = RDC_NEWSEQ + 1;
2033
2034 mutex_exit(QLOCK(q));
2035 mutex_exit(QHEADLOCK(q));
2036
2037 #ifdef DEBUG_FLUSHER_UBERNOISE
2038 p = &iohdr->dat;
2039 cmn_err(CE_NOTE, "!unqueued iohdr from %d pos: %d len: %d flag: %d "
2040 "iostatus: %d setid: %d time: %d", qhead, p->pos, p->len,
2041 p->flag, p->iostatus, p->setid, p->time);
2042 #endif
2043
2044 if (nullhandle) /* nothing to get from queue */
2045 goto nullbuf;
2046
2047 /* now that we know how much to get (iohdr.dat.len), get it */
2048 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_start);
2049
2050 rc = nsc_alloc_buf(group->diskqfd, qhead + 1, iohdr->dat.len,
2051 NSC_NOCACHE | NSC_READ, &buf);
2052
2053 DTRACE_PROBE(rdc_diskq_unq_allocbuf1_end);
2054
2055 /* and get somewhere to keep it for a bit */
2056 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_start);
2057
2058 rc1 = nsc_alloc_abuf(qhead + 1, iohdr->dat.len, 0, &abuf);
2059
2060 DTRACE_PROBE(rdc_diskq_unq_allocbuf2_end);
2061
2062 if (!RDC_SUCCESS(rc) || !RDC_SUCCESS(rc1)) { /* uh-oh */
2063 cmn_err(CE_WARN, "!disk queue %s read failure",
2064 urdc->disk_queue);
2065 goto fail;
2066 }
2067
2068 /* move it on over... */
2069 rc2 = nsc_copy(buf, abuf, qhead + 1, qhead + 1, iohdr->dat.len);
2070
2071 if (!RDC_SUCCESS(rc2)) {
2072 #ifdef DEBUG
2073 cmn_err(CE_WARN, "!nsc_copy failed for diskq unqueue");
2074 #endif
2075 goto fail;
2076 }
2077
2078 /* let go of the real buf, we've got the abuf */
2079 (void) nsc_free_buf(buf);
2080 buf = NULL;
2081
2082 aio->handle = abuf;
2083 /* Hack in the original sb_pos */
2084 aio->handle->sb_pos = iohdr->dat.hpos;
2085
2086 /* skip the RDC_HANDLE_LIMITS check */
2087 abuf->sb_user |= RDC_DISKQUE;
2088
2089 nullbuf:
2090 if (nullhandle) {
2091 aio->handle = NULL;
2092 }
2093
2094 /* set up the rest of the aio values, seq set above ... */
2095 aio->pos = iohdr->dat.pos;
2096 aio->qpos = iohdr->dat.qpos;
2097 aio->len = iohdr->dat.len;
2098 aio->flag = iohdr->dat.flag;
2099 aio->index = rdc_setid2idx(iohdr->dat.setid);
2100 if (aio->index < 0) { /* uh-oh */
2101 #ifdef DEBUG
2102 cmn_err(CE_WARN, "!rdc_diskq_unqueue: index < 0");
2103 #endif
2104 goto fail;
2105 }
2106
2107
2108 #ifdef DEBUG_FLUSHER_UBERNOISE_STAMP
2109 h = &q->disk_hdr.h;
2110 cmn_err(CE_NOTE, "!stamping diskq header:\n"
2111 "magic: %x\nstate: %d\nhead_offset: %d\n"
2112 "tail_offset: %d\ndisk_size: %d\nnitems: %d\nblocks: %d\n",
2113 h->magic, h->state, h->head_offset, h->tail_offset,
2114 h->disk_size, h->nitems, h->blocks);
2115 #endif
2116
2117 _rdc_rlse_diskq(group);
2118
2119 mutex_enter(QLOCK(q));
2120 rdc_clr_qbusy(q);
2121 mutex_exit(QLOCK(q));
2122
2123 DTRACE_PROBE(rdc_diskq_unq_rlse);
2124
2125 iohdr->dat.iostatus = aio->seq;
2126 rdc_add_iohdr(iohdr, group);
2127
2128 #ifdef DEBUG_FLUSHER_UBERNOISE
2129 if (!nullhandle) {
2130 cmn_err(CE_NOTE, "!UNQUEUING, %p"
2131 " contents: %c%c%c%c%c pos: %d len: %d",
2132 (void *)aio->handle,
2133 aio->handle->sb_vec[0].sv_addr[0],
2134 aio->handle->sb_vec[0].sv_addr[1],
2135 aio->handle->sb_vec[0].sv_addr[2],
2136 aio->handle->sb_vec[0].sv_addr[3],
2137 aio->handle->sb_vec[0].sv_addr[4],
2138 aio->handle->sb_pos, aio->handle->sb_len);
2139 } else {
2140 cmn_err(CE_NOTE, "!UNQUEUING, NULL " QDISPLAY(q));
2141 }
2142 cmn_err(CE_NOTE, "!qinfo: " QDISPLAY(q));
2143 #endif
2144
2145 return (aio);
2146
2147 fail:
2148 if (aio)
2149 kmem_free(aio, sizeof (*aio));
2150 if (iohdr)
2151 kmem_free(iohdr, sizeof (*iohdr));
2152 if (buf)
2153 (void) nsc_free_buf(buf);
2154 if (abuf)
2155 (void) nsc_free_buf(abuf);
2156
2157 _rdc_rlse_diskq(group);
2158 #ifdef DEBUG
2159 cmn_err(CE_WARN, "!diskq_unqueue: failing diskq");
2160 #endif
2161 mutex_enter(QLOCK(q));
2162 rdc_clr_qbusy(q);
2163 mutex_exit(QLOCK(q));
2164
2165 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2166
2167 return (NULL);
2168 }
2169
2170 int
2171 rdc_diskq_inuse(rdc_set_t *set, char *diskq)
2172 {
2173 rdc_u_info_t *urdc;
2174 char *group;
2175 int index;
2176
2177 group = set->group_name;
2178
2179 ASSERT(MUTEX_HELD(&rdc_conf_lock));
2180
2181 if ((rdc_lookup_bitmap(diskq) >= 0) ||
2182 (rdc_lookup_configured(diskq) >= 0)) {
2183 return (1);
2184 }
2185 for (index = 0; index < rdc_max_sets; index++) {
2186 urdc = &rdc_u_info[index];
2187
2188 if (!IS_ENABLED(urdc))
2189 continue;
2190
2191 /* same diskq different group */
2192 if ((strcmp(urdc->disk_queue, diskq) == 0) &&
2193 (urdc->group_name[0] == '\0' ||
2194 strcmp(urdc->group_name, group))) {
2195 return (1);
2196 }
2197 }
2198 /* last, but not least, lets see if someone is getting really funky */
2199 if ((strcmp(set->disk_queue, set->primary.file) == 0) ||
2200 (strcmp(set->disk_queue, set->primary.bitmap) == 0)) {
2201 return (1);
2202 }
2203
2204 return (0);
2205
2206 }
2207
2208 #ifdef DEBUG
2209 int maxlen = 0;
2210 int avelen = 0;
2211 int totalen = 0;
2212 int lencalls = 0;
2213
2214 void
2215 update_lenstats(int len)
2216 {
2217 if (lencalls == 0) {
2218 lencalls = 1;
2219 avelen = 0;
2220 maxlen = 0;
2221 totalen = 0;
2222 }
2223
2224 if (len > maxlen)
2225 maxlen = len;
2226 totalen += len;
2227 avelen = totalen / lencalls;
2228 }
2229 #endif
2230
2231 /*
2232 * rdc_calc_len()
2233 * returns the size of the diskq that can be read for dequeuing
2234 * always <= RDC_MAX_DISKQREAD
2235 */
2236 int
2237 rdc_calc_len(rdc_k_info_t *krdc, disk_queue *dq)
2238 {
2239 nsc_size_t len = 0;
2240
2241 ASSERT(MUTEX_HELD(QLOCK(dq)));
2242
2243 /* ---H-----N-----T--- */
2244 if (QNXTIO(dq) < QTAIL(dq)) {
2245
2246 len = min(RDC_MAX_DISKQREAD, QTAIL(dq) - QNXTIO(dq));
2247
2248 /* ---T-----H-----N--- */
2249 } else if (QNXTIO(dq) > QTAIL(dq)) {
2250 if (QWRAP(dq)) {
2251 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2252 } else { /* should never happen */
2253 len = min(RDC_MAX_DISKQREAD, QSIZE(dq) - QNXTIO(dq));
2254 }
2255 } else if (QNXTIO(dq) == QTAIL(dq)) {
2256 if (QWRAP(dq) && !IS_QSTATE(dq, QNXTIOWRAPD))
2257 len = min(RDC_MAX_DISKQREAD, QWRAP(dq) - QNXTIO(dq));
2258 }
2259
2260 len = min(len, krdc->maxfbas);
2261
2262 #ifdef DEBUG
2263 lencalls++;
2264 update_lenstats(len);
2265 #endif
2266
2267 return ((int)len);
2268 }
2269
2270 /*
2271 * lie a little if we can, so we don't get tied up in
2272 * _nsc_wait_dbuf() on the next read. sb_len MUST be
2273 * restored before nsc_free_buf() however, or we will
2274 * be looking at memory leak city..
2275 * so update the entire queue with the info as well
2276 * and the one that ends up freeing it, can fix the len
2277 * IMPORTANT: This assumes that we are not cached, in
2278 * 3.2 caching was turned off for data volumes, if that
2279 * changes, then this must too
2280 */
2281 void
2282 rdc_trim_buf(nsc_buf_t *buf, net_queue *q)
2283 {
2284 rdc_aio_t *p;
2285 int len;
2286
2287 if (buf == NULL || q == NULL)
2288 return;
2289
2290 if (q && (buf->sb_len >
2291 (q->blocks + q->nitems - q->net_qtail->orig_len))) {
2292 len = buf->sb_len;
2293 buf->sb_len = (q->blocks + q->nitems - q->net_qtail->orig_len);
2294 }
2295
2296 p = q->net_qhead;
2297 do {
2298 p->orig_len = len;
2299 p = p->next;
2300
2301 } while (p);
2302
2303 }
2304
2305 /*
2306 * rdc_read_diskq_buf()
2307 * read a large as possible chunk of the diskq into a nsc_buf_t
2308 * and convert it to a net_queue of rdc_aio_t's to be appended
2309 * to the group's netqueue
2310 */
2311 net_queue *
2312 rdc_read_diskq_buf(int index)
2313 {
2314 nsc_buf_t *buf = NULL;
2315 net_queue *tmpnq = NULL;
2316 disk_queue *dq = NULL;
2317 rdc_k_info_t *krdc = &rdc_k_info[index];
2318 rdc_u_info_t *urdc = &rdc_u_info[index];
2319 rdc_group_t *group = krdc->group;
2320 net_queue *nq = &group->ra_queue;
2321 int len = 0;
2322 int rc;
2323 int fail = 0;
2324 int offset = 0;
2325
2326 if (group == NULL || group->diskqfd == NULL) {
2327 DTRACE_PROBE(rdc_read_diskq_buf_bail1);
2328 return (NULL);
2329 }
2330
2331 dq = &group->diskq;
2332
2333 mutex_enter(QLOCK(dq));
2334 rdc_set_qbusy(dq); /* prevent disables on the queue */
2335 mutex_exit(QLOCK(dq));
2336
2337 if (_rdc_rsrv_diskq(group)) {
2338 cmn_err(CE_WARN, "!rdc_readdiskqbuf: %s reserve failed",
2339 urdc->disk_queue);
2340 mutex_enter(QLOCK(dq));
2341 rdc_clr_qbusy(dq); /* prevent disables on the queue */
2342 mutex_exit(QLOCK(dq));
2343 return (NULL);
2344 }
2345
2346 mutex_enter(QHEADLOCK(dq));
2347 mutex_enter(QLOCK(dq));
2348
2349 if (IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2350 IS_STATE(urdc, RDC_LOGGING) ||
2351 (nq->qfflags & RDC_QFILLSLEEP)) {
2352 mutex_exit(QLOCK(dq));
2353 mutex_exit(QHEADLOCK(dq));
2354 DTRACE_PROBE(rdc_read_diskq_buf_bail2);
2355 goto done;
2356 }
2357
2358 /*
2359 * real corner case here, we need to let the flusher wrap first.
2360 * we've gotten too far ahead, so just delay and try again
2361 */
2362 if (IS_QSTATE(dq, QNXTIOWRAPD) && AUXQWRAP(dq)) {
2363 mutex_exit(QLOCK(dq));
2364 mutex_exit(QHEADLOCK(dq));
2365 goto done;
2366 }
2367
2368 if (QNXTIOSHLDWRAP(dq)) {
2369 #ifdef DEBUG_DISKQWRAP
2370 cmn_err(CE_NOTE, "!wrapping Q nxtio: " QDISPLAY(dq));
2371 #endif
2372 /*LINTED*/
2373 WRAPQNXTIO(dq);
2374 }
2375
2376 /* read the metainfo at q->nxt_io first */
2377 if (!QNITEMS(dq)) { /* empty */
2378
2379 if (dq->lastio->handle)
2380 (void) nsc_free_buf(dq->lastio->handle);
2381 bzero(&(*dq->lastio), sizeof (*dq->lastio));
2382 mutex_exit(QLOCK(dq));
2383 mutex_exit(QHEADLOCK(dq));
2384 DTRACE_PROBE(rdc_read_diskq_buf_bail3);
2385 goto done;
2386 }
2387
2388
2389 len = rdc_calc_len(krdc, dq);
2390
2391 if ((len <= 0) || (IS_STATE(urdc, RDC_LOGGING)) ||
2392 (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2393 (nq->qfflags & RDC_QFILLSLEEP)) {
2394 mutex_exit(QLOCK(dq));
2395 mutex_exit(QHEADLOCK(dq));
2396 /*
2397 * a write could be trying to get on the queue, or if
2398 * the queue is really really small, a complete image
2399 * of it could be on the net queue waiting for flush.
2400 * the latter being a fairly stupid scenario and a gross
2401 * misconfiguration.. but what the heck, why make the thread
2402 * thrash around.. just pause a little here.
2403 */
2404 if (len <= 0)
2405 delay(50);
2406
2407 DTRACE_PROBE3(rdc_read_diskq_buf_bail4, int, len,
2408 int, rdc_get_vflags(urdc), int, nq->qfflags);
2409
2410 goto done;
2411 }
2412
2413 DTRACE_PROBE2(rdc_calc_len, int, len, int, (int)QNXTIO(dq));
2414
2415 #ifdef DEBUG_FLUSHER_UBERNOISE
2416 cmn_err(CE_WARN, "!CALC_LEN(%d) h:%d n%d t%d, w%d",
2417 len, QHEAD(dq), QNXTIO(dq), QTAIL(dq), QWRAP(dq));
2418 cmn_err(CE_CONT, "!qinfo: " QDISPLAYND(dq));
2419 #endif
2420 SET_QCOALBOUNDS(dq, QNXTIO(dq) + len);
2421
2422 while ((LASTQTAIL(dq) > 0) && !QWRAP(dq) &&
2423 ((QNXTIO(dq) + len) >= LASTQTAIL(dq)) &&
2424 (IS_QSTATE(dq, QTAILBUSY))) {
2425 mutex_exit(QLOCK(dq));
2426
2427 #ifdef DEBUG_FLUSHER_UBERNOISE
2428 cmn_err(CE_NOTE, "!Qtail busy delay nxtio %d len %d "
2429 "lastqtail: %d", QNXTIO(dq), len, LASTQTAIL(dq));
2430 #endif
2431 delay(20);
2432 mutex_enter(QLOCK(dq));
2433 }
2434
2435 offset = QNXTIO(dq);
2436
2437 /*
2438 * one last check to see if we have gone logging, or should.
2439 * we may have released the mutex above, so check again
2440 */
2441 if ((IS_STATE(urdc, RDC_LOGGING)) ||
2442 (IS_STATE(urdc, RDC_DISKQ_FAILED)) ||
2443 (nq->qfflags & RDC_QFILLSLEEP)) {
2444 mutex_exit(QLOCK(dq));
2445 mutex_exit(QHEADLOCK(dq));
2446 goto done;
2447 }
2448
2449 mutex_exit(QLOCK(dq));
2450 mutex_exit(QHEADLOCK(dq));
2451
2452 DTRACE_PROBE2(rdc_buf2q_preread, int, offset, int, len);
2453
2454 rc = nsc_alloc_buf(group->diskqfd, offset, len,
2455 NSC_NOCACHE | NSC_READ, &buf);
2456
2457 if (!RDC_SUCCESS(rc)) {
2458 cmn_err(CE_WARN, "!disk queue %s read failure pos %" NSC_SZFMT
2459 " len %d", urdc->disk_queue, QNXTIO(dq), len);
2460 fail++;
2461 buf = NULL;
2462 DTRACE_PROBE(rdc_read_diskq_buf_bail5);
2463 goto done;
2464 }
2465
2466 DTRACE_PROBE2(rdc_buf2q_postread, int, offset, nsc_size_t, buf->sb_len);
2467
2468 /*
2469 * convert buf to a net_queue. buf2queue will
2470 * update the QNXTIO pointer for us, based on
2471 * the last readable queue item
2472 */
2473 tmpnq = rdc_diskq_buf2queue(group, &buf, index);
2474
2475 #ifdef DEBUG_FLUSHER_UBERNOISE
2476 cmn_err(CE_NOTE, "!QBUF p: %d l: %d p+l: %d users: %d qblocks: %d ",
2477 "qitems: %d WASTED: %d", buf->sb_pos, buf->sb_len,
2478 buf->sb_pos+buf->sb_len, buf->sb_user, tmpnq?tmpnq->blocks:-1,
2479 tmpnq?tmpnq->nitems:-1,
2480 tmpnq?((buf->sb_len-tmpnq->nitems) - tmpnq->blocks):-1);
2481 #endif
2482
2483 DTRACE_PROBE3(rdc_buf2que_returned, net_queue *, tmpnq?tmpnq:0,
2484 uint64_t, tmpnq?tmpnq->nitems:0,
2485 uint_t, tmpnq?tmpnq->net_qhead->seq:0);
2486 done:
2487
2488 /* we don't need to retain the buf */
2489 if (tmpnq == NULL)
2490 if (buf) {
2491 (void) nsc_free_buf(buf);
2492 buf = NULL;
2493 }
2494
2495 rdc_trim_buf(buf, tmpnq);
2496
2497 mutex_enter(QLOCK(dq));
2498 rdc_clr_qbusy(dq);
2499 mutex_exit(QLOCK(dq));
2500
2501 _rdc_rlse_diskq(group);
2502
2503 if (fail) {
2504 rdc_fail_diskq(krdc, RDC_NOWAIT, RDC_DOLOG);
2505 tmpnq = NULL;
2506 }
2507
2508 return (tmpnq);
2509 }
2510
2511 /*
2512 * rdc_dequeue()
2513 * removes the head of the memory queue
2514 */
2515 rdc_aio_t *
2516 rdc_dequeue(rdc_k_info_t *krdc, int *rc)
2517 {
2518 net_queue *q = &krdc->group->ra_queue;
2519 disk_queue *dq = &krdc->group->diskq;
2520 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2521 rdc_aio_t *aio;
2522
2523 *rc = 0;
2524
2525 if (q == NULL)
2526 return (NULL);
2527
2528 mutex_enter(&q->net_qlock);
2529
2530 aio = q->net_qhead;
2531
2532 if (aio == NULL) {
2533 #ifdef DEBUG
2534 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2535 cmn_err(CE_PANIC,
2536 "rdc_dequeue(1): q %p, q blocks %" NSC_SZFMT
2537 " , nitems %" NSC_SZFMT ", qhead %p qtail %p",
2538 (void *) q, q->blocks, q->nitems,
2539 (void *) aio, (void *) q->net_qtail);
2540 }
2541 #endif
2542
2543 mutex_exit(&q->net_qlock);
2544
2545 if ((!IS_STATE(urdc, RDC_LOGGING)) &&
2546 (!(q->qfflags & RDC_QFILLSLEEP)) &&
2547 (!IS_STATE(urdc, RDC_SYNCING)) && (QNITEMS(dq) > 0)) {
2548 *rc = EAGAIN;
2549 }
2550
2551 goto done;
2552 }
2553
2554 /* aio remove from q */
2555
2556 q->net_qhead = aio->next;
2557 aio->next = NULL;
2558
2559 if (q->net_qtail == aio)
2560 q->net_qtail = q->net_qhead;
2561
2562 q->blocks -= aio->len;
2563 q->nitems--;
2564
2565 #ifdef DEBUG
2566 if (q->net_qhead == NULL) {
2567 if (q->nitems != 0 || q->blocks != 0 || q->net_qtail != 0) {
2568 cmn_err(CE_PANIC, "rdc_dequeue(2): q %p, q blocks %"
2569 NSC_SZFMT " nitems %" NSC_SZFMT
2570 " , qhead %p qtail %p",
2571 (void *) q, q->blocks, q->nitems,
2572 (void *) q->net_qhead, (void *) q->net_qtail);
2573 }
2574 }
2575 #endif
2576 mutex_exit(&q->net_qlock);
2577 done:
2578
2579 mutex_enter(&q->net_qlock);
2580
2581 if (rdc_qfill_shldwakeup(krdc))
2582 cv_broadcast(&q->qfcv);
2583
2584 /*
2585 * clear EAGAIN if
2586 * logging or q filler thread is sleeping or stopping altogether
2587 * or if q filler thread is dead already
2588 * or if syncing, this will return a null aio, with no error code set
2589 * telling the flusher to die
2590 */
2591 if (*rc == EAGAIN) {
2592 if (IS_STATE(urdc, RDC_LOGGING) ||
2593 (q->qfflags & (RDC_QFILLSLEEP | RDC_QFILLSTOP)) ||
2594 (IS_QSTATE(dq, (RDC_QDISABLEPEND | RDC_STOPPINGFLUSH))) ||
2595 (q->qfill_sleeping == RDC_QFILL_DEAD) ||
2596 (IS_STATE(urdc, RDC_SYNCING)))
2597 *rc = 0;
2598 }
2599
2600 mutex_exit(&q->net_qlock);
2601
2602 return (aio);
2603
2604 }
2605
2606 /*
2607 * rdc_qfill_shldsleep()
2608 * returns 1 if the qfilling code should cv_wait() 0 if not.
2609 * reasons for going into cv_wait();
2610 * there is nothing in the diskq to flush to mem.
2611 * the memory queue has gotten too big and needs more flushing attn.
2612 */
2613 int
2614 rdc_qfill_shldsleep(rdc_k_info_t *krdc)
2615 {
2616 net_queue *nq = &krdc->group->ra_queue;
2617 disk_queue *dq = &krdc->group->diskq;
2618 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2619
2620 ASSERT(MUTEX_HELD(&nq->net_qlock));
2621
2622 if (!RDC_IS_DISKQ(krdc->group))
2623 return (1);
2624
2625 if (nq->qfflags & RDC_QFILLSLEEP) {
2626 #ifdef DEBUG_DISKQ_NOISY
2627 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QFILLSLEEP idx: %d",
2628 krdc->index);
2629 #endif
2630 return (1);
2631 }
2632
2633 if (IS_STATE(urdc, RDC_LOGGING) || IS_STATE(urdc, RDC_SYNCING)) {
2634 #ifdef DEBUG_DISKQ_NOISY
2635 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: Sync|Log (0x%x)"
2636 " idx: %d", rdc_get_vflags(urdc), urdc->index);
2637 #endif
2638 return (1);
2639 }
2640
2641 mutex_enter(QLOCK(dq));
2642 if ((QNXTIO(dq) == QTAIL(dq)) && !IS_QSTATE(dq, RDC_QFULL)) {
2643 #ifdef DEBUG_DISKQ_NOISY
2644 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: QEMPTY");
2645 #endif
2646 mutex_exit(QLOCK(dq));
2647 return (1);
2648 }
2649 mutex_exit(QLOCK(dq));
2650
2651 if (nq->blocks >= RDC_MAX_QBLOCKS) {
2652 nq->hwmhit = 1;
2653 /* stuck flushers ? */
2654 #ifdef DEBUG_DISKQ_NOISY
2655 cmn_err(CE_NOTE, "!Sleeping diskq->memq flusher: memq full:"
2656 " seq: %d seqack %d", krdc->group->seq,
2657 krdc->group->seqack);
2658 #endif
2659 return (1);
2660 }
2661
2662 return (0);
2663 }
2664
2665 /*
2666 * rdc_join_netqueues(a, b)
2667 * appends queue b to queue a updating all the queue info
2668 * as it is assumed queue a is the important one,
2669 * it's mutex must be held. no one can add to queue b
2670 */
2671 void
2672 rdc_join_netqueues(net_queue *q, net_queue *tmpq)
2673 {
2674 ASSERT(MUTEX_HELD(&q->net_qlock));
2675
2676 if (q->net_qhead == NULL) { /* empty */
2677 #ifdef DEBUG
2678 if (q->blocks != 0 || q->nitems != 0) {
2679 cmn_err(CE_PANIC, "rdc filler: q %p, qhead 0, "
2680 " q blocks %" NSC_SZFMT ", nitems %" NSC_SZFMT,
2681 (void *) q, q->blocks, q->nitems);
2682 }
2683 #endif
2684 q->net_qhead = tmpq->net_qhead;
2685 q->net_qtail = tmpq->net_qtail;
2686 q->nitems = tmpq->nitems;
2687 q->blocks = tmpq->blocks;
2688 } else {
2689 q->net_qtail->next = tmpq->net_qhead;
2690 q->net_qtail = tmpq->net_qtail;
2691 q->nitems += tmpq->nitems;
2692 q->blocks += tmpq->blocks;
2693 }
2694
2695 if (q->nitems > q->nitems_hwm) {
2696 q->nitems_hwm = q->nitems;
2697 }
2698
2699 if (q->blocks > q->blocks_hwm) {
2700 q->blocks_hwm = q->blocks;
2701 }
2702 }
2703
2704 /*
2705 * rdc_qfiller_thr() single thread that moves
2706 * data from the diskq to a memory queue for
2707 * the flusher to pick up.
2708 */
2709 void
2710 rdc_qfiller_thr(rdc_k_info_t *krdc)
2711 {
2712 rdc_group_t *grp = krdc->group;
2713 rdc_u_info_t *urdc = &rdc_u_info[krdc->index];
2714 net_queue *q = &grp->ra_queue;
2715 net_queue *tmpq = NULL;
2716 int index = krdc->index;
2717
2718 q->qfill_sleeping = RDC_QFILL_AWAKE;
2719 while (!(q->qfflags & RDC_QFILLSTOP)) {
2720 if (!RDC_IS_DISKQ(grp) ||
2721 IS_STATE(urdc, RDC_LOGGING) ||
2722 IS_STATE(urdc, RDC_DISKQ_FAILED) ||
2723 (q->qfflags & RDC_QFILLSLEEP)) {
2724 goto nulltmpq;
2725 }
2726
2727 DTRACE_PROBE(qfiller_top);
2728 tmpq = rdc_read_diskq_buf(index);
2729
2730 if (tmpq == NULL)
2731 goto nulltmpq;
2732
2733 if ((q->qfflags & RDC_QFILLSLEEP) ||
2734 IS_STATE(urdc, RDC_LOGGING)) {
2735 rdc_discard_tmpq(tmpq);
2736 goto nulltmpq;
2737 }
2738
2739 mutex_enter(&q->net_qlock);
2740
2741 /* race with log, redundant yet paranoid */
2742 if ((q->qfflags & RDC_QFILLSLEEP) ||
2743 IS_STATE(urdc, RDC_LOGGING)) {
2744 rdc_discard_tmpq(tmpq);
2745 mutex_exit(&q->net_qlock);
2746 goto nulltmpq;
2747 }
2748
2749
2750 rdc_join_netqueues(q, tmpq);
2751 kmem_free(tmpq, sizeof (*tmpq));
2752 tmpq = NULL;
2753
2754 mutex_exit(&q->net_qlock);
2755 nulltmpq:
2756 /*
2757 * sleep for a while if we can.
2758 * the enqueuing or flushing code will
2759 * wake us if if necessary.
2760 */
2761 mutex_enter(&q->net_qlock);
2762 while (rdc_qfill_shldsleep(krdc)) {
2763 q->qfill_sleeping = RDC_QFILL_ASLEEP;
2764 DTRACE_PROBE(qfiller_sleep);
2765 cv_wait(&q->qfcv, &q->net_qlock);
2766 DTRACE_PROBE(qfiller_wakeup);
2767 q->qfill_sleeping = RDC_QFILL_AWAKE;
2768 if (q->qfflags & RDC_QFILLSTOP) {
2769 #ifdef DEBUG_DISKQ
2770 cmn_err(CE_NOTE,
2771 "!rdc_qfiller_thr: recieved kill signal");
2772 #endif
2773 mutex_exit(&q->net_qlock);
2774 goto done;
2775 }
2776 }
2777 mutex_exit(&q->net_qlock);
2778
2779 DTRACE_PROBE(qfiller_bottom);
2780 }
2781 done:
2782 DTRACE_PROBE(qfiller_done);
2783 q->qfill_sleeping = RDC_QFILL_DEAD; /* the big sleep */
2784
2785 #ifdef DEBUG
2786 cmn_err(CE_NOTE, "!rdc_qfiller_thr stopping");
2787 #endif
2788 q->qfflags &= ~RDC_QFILLSTOP;
2789
2790 }
2791
2792 int
2793 _rdc_add_diskq(int index, char *diskq)
2794 {
2795 rdc_k_info_t *krdc, *kp;
2796 rdc_u_info_t *urdc, *up;
2797 rdc_group_t *group;
2798 int rc;
2799
2800 krdc = &rdc_k_info[index];
2801 urdc = &rdc_u_info[index];
2802 group = krdc->group;
2803
2804 if (!diskq || urdc->disk_queue[0]) { /* how'd that happen? */
2805 #ifdef DEBUG
2806 cmn_err(CE_WARN, "!NULL diskq in _rdc_add_diskq");
2807 #endif
2808 rc = -1;
2809 goto fail;
2810 }
2811
2812 /* if the enable fails, this is bzero'ed */
2813 (void) strncpy(urdc->disk_queue, diskq, NSC_MAXPATH);
2814 group->flags &= ~RDC_MEMQUE;
2815 group->flags |= RDC_DISKQUE;
2816
2817 #ifdef DEBUG
2818 cmn_err(CE_NOTE, "!adding diskq to group %s", urdc->group_name);
2819 #endif
2820 mutex_enter(&rdc_conf_lock);
2821 rc = rdc_enable_diskq(krdc);
2822 mutex_exit(&rdc_conf_lock);
2823
2824 if (rc == RDC_EQNOADD) {
2825 goto fail;
2826 }
2827
2828 RDC_ZERO_BITREF(krdc);
2829 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
2830 up = &rdc_u_info[kp->index];
2831 (void) strncpy(up->disk_queue, diskq, NSC_MAXPATH);
2832 /* size lives in the diskq structure, already set by enable */
2833 RDC_ZERO_BITREF(kp);
2834 }
2835
2836 fail:
2837 return (rc);
2838
2839 }
2840
2841 /*
2842 * add a diskq to an existing set/group
2843 */
2844 int
2845 rdc_add_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2846 {
2847 char *diskq;
2848 int rc;
2849 int index;
2850 rdc_k_info_t *krdc, *this;
2851 rdc_u_info_t *urdc;
2852 rdc_group_t *group;
2853 nsc_size_t vol_size = 0;
2854 nsc_size_t req_size = 0;
2855
2856 mutex_enter(&rdc_conf_lock);
2857 index = rdc_lookup_byname(uparms->rdc_set);
2858 mutex_exit(&rdc_conf_lock);
2859 if (index < 0) {
2860 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
2861 uparms->rdc_set->secondary.file);
2862 rc = RDC_EALREADY;
2863 goto failed;
2864 }
2865 urdc = &rdc_u_info[index];
2866 krdc = &rdc_k_info[index];
2867 this = &rdc_k_info[index];
2868 group = krdc->group;
2869 diskq = uparms->rdc_set->disk_queue;
2870
2871 if (!IS_ASYNC(urdc)) {
2872 spcs_s_add(kstatus, RDC_EQWRONGMODE, urdc->primary.intf,
2873 urdc->primary.file, urdc->secondary.intf,
2874 urdc->secondary.file);
2875 rc = RDC_EQNOQUEUE;
2876 goto failed;
2877 }
2878
2879 do {
2880 if (!IS_STATE(urdc, RDC_LOGGING)) {
2881 spcs_s_add(kstatus, RDC_EQNOTLOGGING,
2882 uparms->rdc_set->disk_queue);
2883 rc = RDC_EQNOTLOGGING;
2884 goto failed;
2885 }
2886 /* make sure that we have enough bitmap vol */
2887 req_size = RDC_BITMAP_FBA + FBA_LEN(krdc->bitmap_size);
2888 req_size += FBA_LEN(krdc->bitmap_size * BITS_IN_BYTE);
2889
2890 rc = _rdc_rsrv_devs(krdc, RDC_BMP, RDC_INTERNAL);
2891
2892 if (!RDC_SUCCESS(rc)) {
2893 cmn_err(CE_WARN,
2894 "!rdc_open_diskq: Bitmap reserve failed");
2895 spcs_s_add(kstatus, RDC_EBITMAP,
2896 urdc->primary.bitmap);
2897 rc = RDC_EBITMAP;
2898 goto failed;
2899 }
2900
2901 (void) nsc_partsize(krdc->bitmapfd, &vol_size);
2902
2903 _rdc_rlse_devs(krdc, RDC_BMP);
2904
2905 if (vol_size < req_size) {
2906 spcs_s_add(kstatus, RDC_EBITMAP2SMALL,
2907 urdc->primary.bitmap);
2908 rc = RDC_EBITMAP2SMALL;
2909 goto failed;
2910 }
2911
2912 krdc = krdc->group_next;
2913 urdc = &rdc_u_info[krdc->index];
2914
2915 } while (krdc != this);
2916
2917 if (urdc->disk_queue[0] != '\0') {
2918 spcs_s_add(kstatus, RDC_EQALREADY, urdc->primary.intf,
2919 urdc->primary.file, urdc->secondary.intf,
2920 urdc->secondary.file);
2921 rc = RDC_EQALREADY;
2922 goto failed;
2923 }
2924
2925 if (uparms->options & RDC_OPT_SECONDARY) { /* how'd we get here? */
2926 spcs_s_add(kstatus, RDC_EQWRONGMODE);
2927 rc = RDC_EQWRONGMODE;
2928 goto failed;
2929 }
2930
2931 mutex_enter(&rdc_conf_lock);
2932 if (rdc_diskq_inuse(uparms->rdc_set, uparms->rdc_set->disk_queue)) {
2933 spcs_s_add(kstatus, RDC_EDISKQINUSE,
2934 uparms->rdc_set->disk_queue);
2935 rc = RDC_EDISKQINUSE;
2936 mutex_exit(&rdc_conf_lock);
2937 goto failed;
2938 }
2939 mutex_exit(&rdc_conf_lock);
2940
2941 rdc_group_enter(krdc);
2942 rc = _rdc_add_diskq(urdc->index, diskq);
2943 if (rc < 0 || rc == RDC_EQNOADD) {
2944 group->flags &= ~RDC_DISKQUE;
2945 group->flags |= RDC_MEMQUE;
2946 spcs_s_add(kstatus, RDC_EQNOADD, uparms->rdc_set->disk_queue);
2947 rc = RDC_EQNOADD;
2948 }
2949 rdc_group_exit(krdc);
2950 failed:
2951 return (rc);
2952 }
2953
2954 int
2955 _rdc_init_diskq(rdc_k_info_t *krdc)
2956 {
2957 rdc_group_t *group = krdc->group;
2958 disk_queue *q = &group->diskq;
2959
2960 rdc_init_diskq_header(group, &group->diskq.disk_hdr);
2961 SET_QNXTIO(q, QHEAD(q));
2962
2963 if (rdc_stamp_diskq(krdc, 0, RDC_NOLOG) < 0)
2964 goto fail;
2965
2966 return (0);
2967 fail:
2968 return (-1);
2969 }
2970
2971 /*
2972 * inititalize the disk queue. This is a destructive
2973 * operation that will not check for emptiness of the queue.
2974 */
2975 int
2976 rdc_init_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
2977 {
2978 int rc = 0;
2979 int index;
2980 rdc_k_info_t *krdc, *kp;
2981 rdc_u_info_t *urdc, *up;
2982 rdc_set_t *uset;
2983 rdc_group_t *group;
2984 disk_queue *qp;
2985
2986 uset = uparms->rdc_set;
2987
2988 mutex_enter(&rdc_conf_lock);
2989 index = rdc_lookup_byname(uset);
2990 mutex_exit(&rdc_conf_lock);
2991 if (index < 0) {
2992 spcs_s_add(kstatus, RDC_EALREADY, uset->primary.file,
2993 uset->secondary.file);
2994 rc = RDC_EALREADY;
2995 goto fail;
2996 }
2997
2998 krdc = &rdc_k_info[index];
2999 urdc = &rdc_u_info[index];
3000 group = krdc->group;
3001 qp = &group->diskq;
3002
3003 if (!IS_STATE(urdc, RDC_SYNCING) && !IS_STATE(urdc, RDC_LOGGING)) {
3004 spcs_s_add(kstatus, RDC_EQUEISREP, urdc->disk_queue);
3005 rc = RDC_EQUEISREP;
3006 goto fail;
3007 }
3008
3009 /*
3010 * a couple of big "ifs" here. in the first implementation
3011 * neither of these will be possible. This will come into
3012 * play when we persist the queue across reboots
3013 */
3014 if (!(uparms->options & RDC_OPT_FORCE_QINIT)) {
3015 if (!QEMPTY(qp)) {
3016 if (group->rdc_writer) {
3017 spcs_s_add(kstatus, RDC_EQFLUSHING,
3018 urdc->disk_queue);
3019 rc = RDC_EQFLUSHING;
3020 } else {
3021 spcs_s_add(kstatus, RDC_EQNOTEMPTY,
3022 urdc->disk_queue);
3023 rc = RDC_EQNOTEMPTY;
3024 }
3025 goto fail;
3026 }
3027 }
3028
3029 mutex_enter(QLOCK(qp));
3030 if (_rdc_init_diskq(krdc) < 0) {
3031 mutex_exit(QLOCK(qp));
3032 goto fail;
3033 }
3034 rdc_dump_iohdrs(qp);
3035
3036 rdc_group_enter(krdc);
3037
3038 rdc_clr_flags(urdc, RDC_QUEUING);
3039 for (kp = krdc->group_next; kp != krdc; kp = kp->group_next) {
3040 up = &rdc_u_info[kp->index];
3041 rdc_clr_flags(up, RDC_QUEUING);
3042 }
3043 rdc_group_exit(krdc);
3044
3045 mutex_exit(QLOCK(qp));
3046
3047 return (0);
3048 fail:
3049 /* generic queue failure */
3050 if (!rc) {
3051 spcs_s_add(kstatus, RDC_EQINITFAIL, urdc->disk_queue);
3052 rc = RDC_EQINITFAIL;
3053 }
3054
3055 return (rc);
3056 }
3057
3058 int
3059 _rdc_kill_diskq(rdc_u_info_t *urdc)
3060 {
3061 rdc_k_info_t *krdc = &rdc_k_info[urdc->index];
3062 rdc_group_t *group = krdc->group;
3063 disk_queue *q = &group->diskq;
3064 rdc_u_info_t *up;
3065 rdc_k_info_t *p;
3066
3067 group->flags |= RDC_DISKQ_KILL;
3068 #ifdef DEBUG
3069 cmn_err(CE_NOTE, "!disabling disk queue %s", urdc->disk_queue);
3070 #endif
3071
3072 mutex_enter(QLOCK(q));
3073 rdc_init_diskq_header(group, &q->disk_hdr);
3074 rdc_dump_iohdrs(q);
3075
3076 /*
3077 * nsc_close the queue and zero out the queue name
3078 */
3079 rdc_wait_qbusy(q);
3080 rdc_close_diskq(group);
3081 mutex_exit(QLOCK(q));
3082 SET_QSIZE(q, 0);
3083 rdc_clr_flags(urdc, RDC_DISKQ_FAILED);
3084 bzero(urdc->disk_queue, NSC_MAXPATH);
3085 for (p = krdc->group_next; p != krdc; p = p->group_next) {
3086 up = &rdc_u_info[p->index];
3087 rdc_clr_flags(up, RDC_DISKQ_FAILED);
3088 bzero(up->disk_queue, NSC_MAXPATH);
3089 }
3090
3091 #ifdef DEBUG
3092 cmn_err(CE_NOTE, "!_rdc_kill_diskq: enabling memory queue");
3093 #endif
3094 group->flags &= ~(RDC_DISKQUE|RDC_DISKQ_KILL);
3095 group->flags |= RDC_MEMQUE;
3096 return (0);
3097 }
3098
3099 /*
3100 * remove this diskq regardless of whether it is draining or not
3101 * stops the flusher by invalidating the qdata (ie, instant empty)
3102 * remove the disk qeueue from the group, leaving the group with a memory
3103 * queue.
3104 */
3105 int
3106 rdc_kill_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3107 {
3108 int rc;
3109 int index;
3110 rdc_u_info_t *urdc;
3111 rdc_k_info_t *krdc;
3112 rdc_set_t *rdc_set = uparms->rdc_set;
3113
3114 mutex_enter(&rdc_conf_lock);
3115 index = rdc_lookup_byname(uparms->rdc_set);
3116 mutex_exit(&rdc_conf_lock);
3117
3118 if (index < 0) {
3119 spcs_s_add(kstatus, RDC_EALREADY, rdc_set->primary.file,
3120 rdc_set->secondary.file);
3121 rc = RDC_EALREADY;
3122 goto failed;
3123 }
3124
3125 urdc = &rdc_u_info[index];
3126 krdc = &rdc_k_info[index];
3127
3128 if (!RDC_IS_DISKQ(krdc->group)) {
3129 spcs_s_add(kstatus, RDC_EQNOQUEUE, rdc_set->primary.intf,
3130 rdc_set->primary.file, rdc_set->secondary.intf,
3131 rdc_set->secondary.file);
3132 rc = RDC_EQNOQUEUE;
3133 goto failed;
3134 }
3135
3136 /*
3137 * if (!IS_STATE(urdc, RDC_LOGGING)) {
3138 * spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3139 * uparms->rdc_set->disk_queue);
3140 * rc = RDC_EQNOTLOGGING;
3141 * goto failed;
3142 * }
3143 */
3144 rdc_unintercept_diskq(krdc->group); /* stop protecting queue */
3145 rdc_group_enter(krdc); /* to prevent further flushing */
3146 rc = _rdc_kill_diskq(urdc);
3147 rdc_group_exit(krdc);
3148
3149 failed:
3150 return (rc);
3151 }
3152
3153 /*
3154 * remove a diskq from a group.
3155 * removal of a diskq from a set, or rather
3156 * a set from a queue, is done by reconfigging out
3157 * of the group. This removes the diskq from a whole
3158 * group and replaces it with a memory based queue
3159 */
3160 #define NUM_RETRIES 15 /* Number of retries to wait if no progress */
3161 int
3162 rdc_rem_diskq(rdc_config_t *uparms, spcs_s_info_t kstatus)
3163 {
3164 int index;
3165 rdc_u_info_t *urdc;
3166 rdc_k_info_t *krdc;
3167 rdc_k_info_t *this;
3168 volatile rdc_group_t *group;
3169 volatile disk_queue *diskq;
3170 int threads, counter;
3171 long blocks;
3172
3173 mutex_enter(&rdc_conf_lock);
3174 index = rdc_lookup_byname(uparms->rdc_set);
3175 mutex_exit(&rdc_conf_lock);
3176 if (index < 0) {
3177 spcs_s_add(kstatus, RDC_EALREADY, uparms->rdc_set->primary.file,
3178 uparms->rdc_set->secondary.file);
3179 return (RDC_EALREADY);
3180 }
3181
3182 urdc = &rdc_u_info[index];
3183 this = &rdc_k_info[index];
3184 krdc = &rdc_k_info[index];
3185
3186 do {
3187 if (!IS_STATE(urdc, RDC_LOGGING)) {
3188 spcs_s_add(kstatus, RDC_EQNOTLOGGING,
3189 urdc->disk_queue);
3190 return (RDC_EQNOTLOGGING);
3191 }
3192 krdc = krdc->group_next;
3193 urdc = &rdc_u_info[krdc->index];
3194
3195 } while (krdc != this);
3196
3197 /*
3198 * If there is no group or diskq configured, we can leave now
3199 */
3200 if (!(group = krdc->group) || !(diskq = &group->diskq))
3201 return (0);
3202
3203
3204 /*
3205 * Wait if not QEMPTY or threads still active
3206 */
3207 counter = 0;
3208 while (!QEMPTY(diskq) || group->rdc_thrnum) {
3209
3210 /*
3211 * Capture counters to determine if progress is being made
3212 */
3213 blocks = QBLOCKS(diskq);
3214 threads = group->rdc_thrnum;
3215
3216 /*
3217 * Wait
3218 */
3219 delay(HZ);
3220
3221 /*
3222 * Has the group or disk queue gone away while delayed?
3223 */
3224 if (!(group = krdc->group) || !(diskq = &group->diskq))
3225 return (0);
3226
3227 /*
3228 * Are we still seeing progress?
3229 */
3230 if (blocks == QBLOCKS(diskq) && threads == group->rdc_thrnum) {
3231 /*
3232 * No progress see, decrement retry counter
3233 */
3234 if (counter++ > NUM_RETRIES) {
3235 /*
3236 * No progress seen, increment retry counter
3237 */
3238 int rc = group->rdc_thrnum ?
3239 RDC_EQFLUSHING : RDC_EQNOTEMPTY;
3240 spcs_s_add(kstatus, rc, urdc->disk_queue);
3241 return (rc);
3242 }
3243 } else {
3244 /*
3245 * Reset counter, as we've made progress
3246 */
3247 counter = 0;
3248 }
3249 }
3250
3251 return (0);
3252 }