1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License, Version 1.0 only
6 * (the "License"). You may not use this file except in compliance
7 * with the License.
8 *
9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10 * or http://www.opensolaris.org/os/licensing.
11 * See the License for the specific language governing permissions
12 * and limitations under the License.
13 *
14 * When distributing Covered Code, include this CDDL HEADER in each
15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16 * If applicable, add the following below this CDDL HEADER, with the
17 * fields enclosed by brackets "[]" replaced with your own identifying
18 * information: Portions Copyright [yyyy] [name of copyright owner]
19 *
20 * CDDL HEADER END
21 */
22 /*
23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
24 * Use is subject to license terms.
25 */
26
27 /*
28 * STREAMS Buffering module
29 *
30 * This streams module collects incoming messages from modules below
31 * it on the stream and buffers them up into a smaller number of
32 * aggregated messages. Its main purpose is to reduce overhead by
33 * cutting down on the number of read (or getmsg) calls its client
34 * user process makes.
35 * - only M_DATA is buffered.
36 * - multithreading assumes configured as D_MTQPAIR
37 * - packets are lost only if flag SB_NO_HEADER is clear and buffer
38 * allocation fails.
39 * - in order message transmission. This is enforced for messages other
40 * than high priority messages.
41 * - zero length messages on the read side are not passed up the
42 * stream but used internally for synchronization.
43 * FLAGS:
44 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
45 * (conversion is the default for backwards compatibility
46 * hence the negative logic).
47 * - SB_NO_HEADER - no headers in buffered data.
48 * (adding headers is the default for backwards compatibility
49 * hence the negative logic).
50 * - SB_DEFER_CHUNK - provides improved response time in question-answer
51 * applications. Buffering is not enabled until the second message
52 * is received on the read side within the sb_ticks interval.
53 * This option will often be used in combination with flag SB_SEND_ON_WRITE.
54 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
55 * data being immediately sent upstream.
56 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
57 * the blocked flow condition downstream. If this flag is clear (default)
58 * messages will be dropped if the upstream flow is blocked.
59 */
60
61
62 #include <sys/types.h>
63 #include <sys/errno.h>
64 #include <sys/debug.h>
65 #include <sys/stropts.h>
66 #include <sys/time.h>
67 #include <sys/stream.h>
68 #include <sys/conf.h>
69 #include <sys/ddi.h>
70 #include <sys/sunddi.h>
71 #include <sys/kmem.h>
72 #include <sys/strsun.h>
73 #include <sys/bufmod.h>
74 #include <sys/modctl.h>
75 #include <sys/isa_defs.h>
76
77 /*
78 * Per-Stream state information.
79 *
80 * If sb_ticks is negative, we don't deliver chunks until they're
81 * full. If it's zero, we deliver every packet as it arrives. (In
82 * this case we force sb_chunk to zero, to make the implementation
83 * easier.) Otherwise, sb_ticks gives the number of ticks in a
84 * buffering interval. The interval begins when the a read side data
85 * message is received and a timeout is not active. If sb_snap is
86 * zero, no truncation of the msg is done.
87 */
88 struct sb {
89 queue_t *sb_rq; /* our rq */
90 mblk_t *sb_mp; /* partial chunk */
91 mblk_t *sb_head; /* pre-allocated space for the next header */
92 mblk_t *sb_tail; /* first mblk of last message appended */
93 uint_t sb_mlen; /* sb_mp length */
94 uint_t sb_mcount; /* input msg count in sb_mp */
95 uint_t sb_chunk; /* max chunk size */
96 clock_t sb_ticks; /* timeout interval */
97 timeout_id_t sb_timeoutid; /* qtimeout() id */
98 uint_t sb_drops; /* cumulative # discarded msgs */
99 uint_t sb_snap; /* snapshot length */
100 uint_t sb_flags; /* flags field */
101 uint_t sb_state; /* state variable */
102 };
103
104 /*
105 * Function prototypes.
106 */
107 static int sbopen(queue_t *, dev_t *, int, int, cred_t *);
108 static int sbclose(queue_t *, int, cred_t *);
109 static void sbwput(queue_t *, mblk_t *);
110 static void sbrput(queue_t *, mblk_t *);
111 static void sbrsrv(queue_t *);
112 static void sbioctl(queue_t *, mblk_t *);
113 static void sbaddmsg(queue_t *, mblk_t *);
114 static void sbtick(void *);
115 static void sbclosechunk(struct sb *);
116 static void sbsendit(queue_t *, mblk_t *);
117
118 static struct module_info sb_minfo = {
119 21, /* mi_idnum */
120 "bufmod", /* mi_idname */
121 0, /* mi_minpsz */
122 INFPSZ, /* mi_maxpsz */
123 1, /* mi_hiwat */
124 0 /* mi_lowat */
125 };
126
127 static struct qinit sb_rinit = {
128 (int (*)())sbrput, /* qi_putp */
129 (int (*)())sbrsrv, /* qi_srvp */
130 sbopen, /* qi_qopen */
131 sbclose, /* qi_qclose */
132 NULL, /* qi_qadmin */
133 &sb_minfo, /* qi_minfo */
134 NULL /* qi_mstat */
135 };
136
137 static struct qinit sb_winit = {
138 (int (*)())sbwput, /* qi_putp */
139 NULL, /* qi_srvp */
140 NULL, /* qi_qopen */
141 NULL, /* qi_qclose */
142 NULL, /* qi_qadmin */
143 &sb_minfo, /* qi_minfo */
144 NULL /* qi_mstat */
145 };
146
147 static struct streamtab sb_info = {
148 &sb_rinit, /* st_rdinit */
149 &sb_winit, /* st_wrinit */
150 NULL, /* st_muxrinit */
151 NULL /* st_muxwinit */
152 };
153
154
155 /*
156 * This is the loadable module wrapper.
157 */
158
159 static struct fmodsw fsw = {
160 "bufmod",
161 &sb_info,
162 D_MTQPAIR | D_MP
163 };
164
165 /*
166 * Module linkage information for the kernel.
167 */
168
169 static struct modlstrmod modlstrmod = {
170 &mod_strmodops, "streams buffer mod", &fsw
171 };
172
173 static struct modlinkage modlinkage = {
174 MODREV_1, { &modlstrmod, NULL }
175 };
176
177
178 int
179 _init(void)
180 {
181 return (mod_install(&modlinkage));
182 }
183
184 int
185 _fini(void)
186 {
187 return (mod_remove(&modlinkage));
188 }
189
190 int
191 _info(struct modinfo *modinfop)
192 {
193 return (mod_info(&modlinkage, modinfop));
194 }
195
196
197 /* ARGSUSED */
198 static int
199 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
200 {
201 struct sb *sbp;
202 ASSERT(rq);
203
204 if (sflag != MODOPEN)
205 return (EINVAL);
206
207 if (rq->q_ptr)
208 return (0);
209
210 /*
211 * Allocate and initialize per-Stream structure.
212 */
213 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
214 sbp->sb_rq = rq;
215 sbp->sb_ticks = -1;
216 sbp->sb_chunk = SB_DFLT_CHUNK;
217 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
218 sbp->sb_mlen = 0;
219 sbp->sb_mcount = 0;
220 sbp->sb_timeoutid = 0;
221 sbp->sb_drops = 0;
222 sbp->sb_snap = 0;
223 sbp->sb_flags = 0;
224 sbp->sb_state = 0;
225
226 rq->q_ptr = WR(rq)->q_ptr = sbp;
227
228 qprocson(rq);
229
230
231 return (0);
232 }
233
234 /* ARGSUSED1 */
235 static int
236 sbclose(queue_t *rq, int flag, cred_t *credp)
237 {
238 struct sb *sbp = (struct sb *)rq->q_ptr;
239
240 ASSERT(sbp);
241
242 qprocsoff(rq);
243 /*
244 * Cancel an outstanding timeout
245 */
246 if (sbp->sb_timeoutid != 0) {
247 (void) quntimeout(rq, sbp->sb_timeoutid);
248 sbp->sb_timeoutid = 0;
249 }
250 /*
251 * Free the current chunk.
252 */
253 if (sbp->sb_mp) {
254 freemsg(sbp->sb_mp);
255 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
256 sbp->sb_mlen = 0;
257 }
258
259 /*
260 * Free the per-Stream structure.
261 */
262 kmem_free((caddr_t)sbp, sizeof (struct sb));
263 rq->q_ptr = WR(rq)->q_ptr = NULL;
264
265 return (0);
266 }
267
268 /*
269 * the correction factor is introduced to compensate for
270 * whatever assumptions the modules below have made about
271 * how much traffic is flowing through the stream and the fact
272 * that bufmod may be snipping messages with the sb_snap length.
273 */
274 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512)
275 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256)
276
277
278 static void
279 sbioc(queue_t *wq, mblk_t *mp)
280 {
281 struct iocblk *iocp;
282 struct sb *sbp = (struct sb *)wq->q_ptr;
283 clock_t ticks;
284 mblk_t *mop;
285
286 iocp = (struct iocblk *)mp->b_rptr;
287
288 switch (iocp->ioc_cmd) {
289 case SBIOCGCHUNK:
290 case SBIOCGSNAP:
291 case SBIOCGFLAGS:
292 case SBIOCGTIME:
293 miocack(wq, mp, 0, 0);
294 return;
295
296 case SBIOCSTIME:
297 #ifdef _SYSCALL32_IMPL
298 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
299 struct timeval32 *t32;
300
301 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
302 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
303 miocnak(wq, mp, 0, EINVAL);
304 break;
305 }
306 ticks = TIMEVAL_TO_TICK(t32);
307 } else
308 #endif /* _SYSCALL32_IMPL */
309 {
310 struct timeval *tb;
311
312 tb = (struct timeval *)mp->b_cont->b_rptr;
313
314 if (tb->tv_sec < 0 || tb->tv_usec < 0) {
315 miocnak(wq, mp, 0, EINVAL);
316 break;
317 }
318 ticks = TIMEVAL_TO_TICK(tb);
319 }
320 sbp->sb_ticks = ticks;
321 if (ticks == 0)
322 sbp->sb_chunk = 0;
323 miocack(wq, mp, 0, 0);
324 sbclosechunk(sbp);
325 return;
326
327 case SBIOCSCHUNK:
328 /*
329 * set up hi/lo water marks on stream head read queue.
330 * unlikely to run out of resources. Fix at later date.
331 */
332 if ((mop = allocb(sizeof (struct stroptions),
333 BPRI_MED)) != NULL) {
334 struct stroptions *sop;
335 uint_t chunk;
336
337 chunk = *(uint_t *)mp->b_cont->b_rptr;
338 mop->b_datap->db_type = M_SETOPTS;
339 mop->b_wptr += sizeof (struct stroptions);
340 sop = (struct stroptions *)mop->b_rptr;
341 sop->so_flags = SO_HIWAT | SO_LOWAT;
342 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
343 sop->so_lowat = SNIT_LOWAT(chunk, 1);
344 qreply(wq, mop);
345 }
346
347 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
348 miocack(wq, mp, 0, 0);
349 sbclosechunk(sbp);
350 return;
351
352 case SBIOCSFLAGS:
353 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
354 miocack(wq, mp, 0, 0);
355 return;
356
357 case SBIOCSSNAP:
358 /*
359 * if chunking dont worry about effects of
360 * snipping of message size on head flow control
361 * since it has a relatively small bearing on the
362 * data rate onto the streamn head.
363 */
364 if (!sbp->sb_chunk) {
365 /*
366 * set up hi/lo water marks on stream head read queue.
367 * unlikely to run out of resources. Fix at later date.
368 */
369 if ((mop = allocb(sizeof (struct stroptions),
370 BPRI_MED)) != NULL) {
371 struct stroptions *sop;
372 uint_t snap;
373 int fudge;
374
375 snap = *(uint_t *)mp->b_cont->b_rptr;
376 mop->b_datap->db_type = M_SETOPTS;
377 mop->b_wptr += sizeof (struct stroptions);
378 sop = (struct stroptions *)mop->b_rptr;
379 sop->so_flags = SO_HIWAT | SO_LOWAT;
380 fudge = snap <= 100 ? 4 :
381 snap <= 400 ? 2 :
382 1;
383 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
384 sop->so_lowat = SNIT_LOWAT(snap, fudge);
385 qreply(wq, mop);
386 }
387 }
388
389 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
390 miocack(wq, mp, 0, 0);
391 return;
392
393 default:
394 ASSERT(0);
395 return;
396 }
397 }
398
399 /*
400 * Write-side put procedure. Its main task is to detect ioctls
401 * for manipulating the buffering state and hand them to sbioctl.
402 * Other message types are passed on through.
403 */
404 static void
405 sbwput(queue_t *wq, mblk_t *mp)
406 {
407 struct sb *sbp = (struct sb *)wq->q_ptr;
408 struct copyresp *resp;
409
410 if (sbp->sb_flags & SB_SEND_ON_WRITE)
411 sbclosechunk(sbp);
412 switch (mp->b_datap->db_type) {
413 case M_IOCTL:
414 sbioctl(wq, mp);
415 break;
416
417 case M_IOCDATA:
418 resp = (struct copyresp *)mp->b_rptr;
419 if (resp->cp_rval) {
420 /*
421 * Just free message on failure.
422 */
423 freemsg(mp);
424 break;
425 }
426
427 switch (resp->cp_cmd) {
428 case SBIOCSTIME:
429 case SBIOCSCHUNK:
430 case SBIOCSFLAGS:
431 case SBIOCSSNAP:
432 case SBIOCGTIME:
433 case SBIOCGCHUNK:
434 case SBIOCGSNAP:
435 case SBIOCGFLAGS:
436 sbioc(wq, mp);
437 break;
438
439 default:
440 putnext(wq, mp);
441 break;
442 }
443 break;
444
445 default:
446 putnext(wq, mp);
447 break;
448 }
449 }
450
451 /*
452 * Read-side put procedure. It's responsible for buffering up incoming
453 * messages and grouping them into aggregates according to the current
454 * buffering parameters.
455 */
456 static void
457 sbrput(queue_t *rq, mblk_t *mp)
458 {
459 struct sb *sbp = (struct sb *)rq->q_ptr;
460
461 ASSERT(sbp);
462
463 switch (mp->b_datap->db_type) {
464 case M_PROTO:
465 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
466 sbclosechunk(sbp);
467 sbsendit(rq, mp);
468 break;
469 } else {
470 /*
471 * Convert M_PROTO to M_DATA.
472 */
473 mp->b_datap->db_type = M_DATA;
474 }
475 /* FALLTHRU */
476
477 case M_DATA:
478 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
479 !(sbp->sb_state & SB_FRCVD)) {
480 sbclosechunk(sbp);
481 sbsendit(rq, mp);
482 sbp->sb_state |= SB_FRCVD;
483 } else
484 sbaddmsg(rq, mp);
485
486 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
487 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
488 sbp, sbp->sb_ticks);
489
490 break;
491
492 case M_FLUSH:
493 if (*mp->b_rptr & FLUSHR) {
494 /*
495 * Reset timeout, flush the chunk currently in
496 * progress, and start a new chunk.
497 */
498 if (sbp->sb_timeoutid) {
499 (void) quntimeout(sbp->sb_rq,
500 sbp->sb_timeoutid);
501 sbp->sb_timeoutid = 0;
502 }
503 if (sbp->sb_mp) {
504 freemsg(sbp->sb_mp);
505 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
506 sbp->sb_mlen = 0;
507 sbp->sb_mcount = 0;
508 }
509 flushq(rq, FLUSHALL);
510 }
511 putnext(rq, mp);
512 break;
513
514 case M_CTL:
515 /*
516 * Zero-length M_CTL means our timeout() popped.
517 */
518 if (MBLKL(mp) == 0) {
519 freemsg(mp);
520 sbclosechunk(sbp);
521 } else {
522 sbclosechunk(sbp);
523 sbsendit(rq, mp);
524 }
525 break;
526
527 default:
528 if (mp->b_datap->db_type <= QPCTL) {
529 sbclosechunk(sbp);
530 sbsendit(rq, mp);
531 } else {
532 /* Note: out of band */
533 putnext(rq, mp);
534 }
535 break;
536 }
537 }
538
539 /*
540 * read service procedure.
541 */
542 /* ARGSUSED */
543 static void
544 sbrsrv(queue_t *rq)
545 {
546 mblk_t *mp;
547
548 /*
549 * High priority messages shouldn't get here but if
550 * one does, jam it through to avoid infinite loop.
551 */
552 while ((mp = getq(rq)) != NULL) {
553 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
554 /* should only get here if SB_NO_SROPS */
555 (void) putbq(rq, mp);
556 return;
557 }
558 putnext(rq, mp);
559 }
560 }
561
562 /*
563 * Handle write-side M_IOCTL messages.
564 */
565 static void
566 sbioctl(queue_t *wq, mblk_t *mp)
567 {
568 struct sb *sbp = (struct sb *)wq->q_ptr;
569 struct iocblk *iocp = (struct iocblk *)mp->b_rptr;
570 struct timeval *t;
571 clock_t ticks;
572 mblk_t *mop;
573 int transparent = iocp->ioc_count;
574 mblk_t *datamp;
575 int error;
576
577 switch (iocp->ioc_cmd) {
578 case SBIOCSTIME:
579 if (iocp->ioc_count == TRANSPARENT) {
580 #ifdef _SYSCALL32_IMPL
581 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
582 mcopyin(mp, NULL, sizeof (struct timeval32),
583 NULL);
584 } else
585 #endif /* _SYSCALL32_IMPL */
586 {
587 mcopyin(mp, NULL, sizeof (*t), NULL);
588 }
589 qreply(wq, mp);
590 } else {
591 /*
592 * Verify argument length.
593 */
594 #ifdef _SYSCALL32_IMPL
595 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
596 struct timeval32 *t32;
597
598 error = miocpullup(mp,
599 sizeof (struct timeval32));
600 if (error != 0) {
601 miocnak(wq, mp, 0, error);
602 break;
603 }
604 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
605 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
606 miocnak(wq, mp, 0, EINVAL);
607 break;
608 }
609 ticks = TIMEVAL_TO_TICK(t32);
610 } else
611 #endif /* _SYSCALL32_IMPL */
612 {
613 error = miocpullup(mp, sizeof (struct timeval));
614 if (error != 0) {
615 miocnak(wq, mp, 0, error);
616 break;
617 }
618
619 t = (struct timeval *)mp->b_cont->b_rptr;
620 if (t->tv_sec < 0 || t->tv_usec < 0) {
621 miocnak(wq, mp, 0, EINVAL);
622 break;
623 }
624 ticks = TIMEVAL_TO_TICK(t);
625 }
626 sbp->sb_ticks = ticks;
627 if (ticks == 0)
628 sbp->sb_chunk = 0;
629 miocack(wq, mp, 0, 0);
630 sbclosechunk(sbp);
631 }
632 break;
633
634 case SBIOCGTIME: {
635 struct timeval *t;
636
637 /*
638 * Verify argument length.
639 */
640 if (transparent != TRANSPARENT) {
641 #ifdef _SYSCALL32_IMPL
642 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
643 error = miocpullup(mp,
644 sizeof (struct timeval32));
645 if (error != 0) {
646 miocnak(wq, mp, 0, error);
647 break;
648 }
649 } else
650 #endif /* _SYSCALL32_IMPL */
651 error = miocpullup(mp, sizeof (struct timeval));
652 if (error != 0) {
653 miocnak(wq, mp, 0, error);
654 break;
655 }
656 }
657
658 /*
659 * If infinite timeout, return range error
660 * for the ioctl.
661 */
662 if (sbp->sb_ticks < 0) {
663 miocnak(wq, mp, 0, ERANGE);
664 break;
665 }
666
667 #ifdef _SYSCALL32_IMPL
668 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
669 struct timeval32 *t32;
670
671 if (transparent == TRANSPARENT) {
672 datamp = allocb(sizeof (*t32), BPRI_MED);
673 if (datamp == NULL) {
674 miocnak(wq, mp, 0, EAGAIN);
675 break;
676 }
677 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
678 }
679
680 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
681 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
682
683 if (transparent == TRANSPARENT)
684 qreply(wq, mp);
685 else
686 miocack(wq, mp, sizeof (*t32), 0);
687 } else
688 #endif /* _SYSCALL32_IMPL */
689 {
690 if (transparent == TRANSPARENT) {
691 datamp = allocb(sizeof (*t), BPRI_MED);
692 if (datamp == NULL) {
693 miocnak(wq, mp, 0, EAGAIN);
694 break;
695 }
696 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
697 }
698
699 t = (struct timeval *)mp->b_cont->b_rptr;
700 TICK_TO_TIMEVAL(sbp->sb_ticks, t);
701
702 if (transparent == TRANSPARENT)
703 qreply(wq, mp);
704 else
705 miocack(wq, mp, sizeof (*t), 0);
706 }
707 break;
708 }
709
710 case SBIOCCTIME:
711 sbp->sb_ticks = -1;
712 miocack(wq, mp, 0, 0);
713 break;
714
715 case SBIOCSCHUNK:
716 if (iocp->ioc_count == TRANSPARENT) {
717 mcopyin(mp, NULL, sizeof (uint_t), NULL);
718 qreply(wq, mp);
719 } else {
720 /*
721 * Verify argument length.
722 */
723 error = miocpullup(mp, sizeof (uint_t));
724 if (error != 0) {
725 miocnak(wq, mp, 0, error);
726 break;
727 }
728
729 /*
730 * set up hi/lo water marks on stream head read queue.
731 * unlikely to run out of resources. Fix at later date.
732 */
733 if ((mop = allocb(sizeof (struct stroptions),
734 BPRI_MED)) != NULL) {
735 struct stroptions *sop;
736 uint_t chunk;
737
738 chunk = *(uint_t *)mp->b_cont->b_rptr;
739 mop->b_datap->db_type = M_SETOPTS;
740 mop->b_wptr += sizeof (struct stroptions);
741 sop = (struct stroptions *)mop->b_rptr;
742 sop->so_flags = SO_HIWAT | SO_LOWAT;
743 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
744 sop->so_lowat = SNIT_LOWAT(chunk, 1);
745 qreply(wq, mop);
746 }
747
748 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
749 miocack(wq, mp, 0, 0);
750 sbclosechunk(sbp);
751 }
752 break;
753
754 case SBIOCGCHUNK:
755 /*
756 * Verify argument length.
757 */
758 if (transparent != TRANSPARENT) {
759 error = miocpullup(mp, sizeof (uint_t));
760 if (error != 0) {
761 miocnak(wq, mp, 0, error);
762 break;
763 }
764 }
765
766 if (transparent == TRANSPARENT) {
767 datamp = allocb(sizeof (uint_t), BPRI_MED);
768 if (datamp == NULL) {
769 miocnak(wq, mp, 0, EAGAIN);
770 break;
771 }
772 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
773 }
774
775 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
776
777 if (transparent == TRANSPARENT)
778 qreply(wq, mp);
779 else
780 miocack(wq, mp, sizeof (uint_t), 0);
781 break;
782
783 case SBIOCSSNAP:
784 if (iocp->ioc_count == TRANSPARENT) {
785 mcopyin(mp, NULL, sizeof (uint_t), NULL);
786 qreply(wq, mp);
787 } else {
788 /*
789 * Verify argument length.
790 */
791 error = miocpullup(mp, sizeof (uint_t));
792 if (error != 0) {
793 miocnak(wq, mp, 0, error);
794 break;
795 }
796
797 /*
798 * if chunking dont worry about effects of
799 * snipping of message size on head flow control
800 * since it has a relatively small bearing on the
801 * data rate onto the streamn head.
802 */
803 if (!sbp->sb_chunk) {
804 /*
805 * set up hi/lo water marks on stream
806 * head read queue. unlikely to run out
807 * of resources. Fix at later date.
808 */
809 if ((mop = allocb(sizeof (struct stroptions),
810 BPRI_MED)) != NULL) {
811 struct stroptions *sop;
812 uint_t snap;
813 int fudge;
814
815 snap = *(uint_t *)mp->b_cont->b_rptr;
816 mop->b_datap->db_type = M_SETOPTS;
817 mop->b_wptr += sizeof (*sop);
818 sop = (struct stroptions *)mop->b_rptr;
819 sop->so_flags = SO_HIWAT | SO_LOWAT;
820 fudge = (snap <= 100) ? 4 :
821 (snap <= 400) ? 2 : 1;
822 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
823 sop->so_lowat = SNIT_LOWAT(snap, fudge);
824 qreply(wq, mop);
825 }
826 }
827
828 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
829
830 miocack(wq, mp, 0, 0);
831 }
832 break;
833
834 case SBIOCGSNAP:
835 /*
836 * Verify argument length
837 */
838 if (transparent != TRANSPARENT) {
839 error = miocpullup(mp, sizeof (uint_t));
840 if (error != 0) {
841 miocnak(wq, mp, 0, error);
842 break;
843 }
844 }
845
846 if (transparent == TRANSPARENT) {
847 datamp = allocb(sizeof (uint_t), BPRI_MED);
848 if (datamp == NULL) {
849 miocnak(wq, mp, 0, EAGAIN);
850 break;
851 }
852 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
853 }
854
855 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
856
857 if (transparent == TRANSPARENT)
858 qreply(wq, mp);
859 else
860 miocack(wq, mp, sizeof (uint_t), 0);
861 break;
862
863 case SBIOCSFLAGS:
864 /*
865 * set the flags.
866 */
867 if (iocp->ioc_count == TRANSPARENT) {
868 mcopyin(mp, NULL, sizeof (uint_t), NULL);
869 qreply(wq, mp);
870 } else {
871 error = miocpullup(mp, sizeof (uint_t));
872 if (error != 0) {
873 miocnak(wq, mp, 0, error);
874 break;
875 }
876 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
877 miocack(wq, mp, 0, 0);
878 }
879 break;
880
881 case SBIOCGFLAGS:
882 /*
883 * Verify argument length
884 */
885 if (transparent != TRANSPARENT) {
886 error = miocpullup(mp, sizeof (uint_t));
887 if (error != 0) {
888 miocnak(wq, mp, 0, error);
889 break;
890 }
891 }
892
893 if (transparent == TRANSPARENT) {
894 datamp = allocb(sizeof (uint_t), BPRI_MED);
895 if (datamp == NULL) {
896 miocnak(wq, mp, 0, EAGAIN);
897 break;
898 }
899 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
900 }
901
902 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
903
904 if (transparent == TRANSPARENT)
905 qreply(wq, mp);
906 else
907 miocack(wq, mp, sizeof (uint_t), 0);
908 break;
909
910
911 default:
912 putnext(wq, mp);
913 break;
914 }
915 }
916
917 /*
918 * Given a length l, calculate the amount of extra storage
919 * required to round it up to the next multiple of the alignment a.
920 */
921 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0)
922 /*
923 * Calculate additional amount of space required for alignment.
924 */
925 #define Align(l) RoundUpAmt(l, sizeof (ulong_t))
926 /*
927 * Smallest possible message size when headers are enabled.
928 * This is used to calculate whether a chunk is nearly full.
929 */
930 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
931
932 /*
933 * Process a read-side M_DATA message.
934 *
935 * If the currently accumulating chunk doesn't have enough room
936 * for the message, close off the chunk, pass it upward, and start
937 * a new one. Then add the message to the current chunk, taking
938 * account of the possibility that the message's size exceeds the
939 * chunk size.
940 *
941 * If headers are enabled add an sb_hdr header and trailing alignment padding.
942 *
943 * To optimise performance the total number of msgbs should be kept
944 * to a minimum. This is achieved by using any remaining space in message N
945 * for both its own padding as well as the header of message N+1 if possible.
946 * If there's insufficient space we allocate one message to hold this 'wrapper'.
947 * (there's likely to be space beyond message N, since allocb would have
948 * rounded up the required size to one of the dblk_sizes).
949 *
950 */
951 static void
952 sbaddmsg(queue_t *rq, mblk_t *mp)
953 {
954 struct sb *sbp;
955 struct timeval t;
956 struct sb_hdr hp;
957 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */
958 mblk_t *last; /* last mblk of current message */
959 size_t wrapperlen; /* length of header + padding */
960 size_t origlen; /* data length before truncation */
961 size_t pad; /* bytes required to align header */
962
963 sbp = (struct sb *)rq->q_ptr;
964
965 origlen = msgdsize(mp);
966
967 /*
968 * Truncate the message.
969 */
970 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
971 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
972 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
973 else
974 hp.sbh_totlen = hp.sbh_msglen = origlen;
975
976 if (sbp->sb_flags & SB_NO_HEADER) {
977
978 /*
979 * Would the inclusion of this message overflow the current
980 * chunk? If so close the chunk off and start a new one.
981 */
982 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
983 sbclosechunk(sbp);
984 /*
985 * First message too big for chunk - just send it up.
986 * This will always be true when we're not chunking.
987 */
988 if (hp.sbh_totlen > sbp->sb_chunk) {
989 sbsendit(rq, mp);
990 return;
991 }
992
993 /*
994 * We now know that the msg will fit in the chunk.
995 * Link it onto the end of the chunk.
996 * Since linkb() walks the entire chain, we keep a pointer to
997 * the first mblk of the last msgb added and call linkb on that
998 * that last message, rather than performing the
999 * O(n) linkb() operation on the whole chain.
1000 * sb_head isn't needed in this SB_NO_HEADER mode.
1001 */
1002 if (sbp->sb_mp)
1003 linkb(sbp->sb_tail, mp);
1004 else
1005 sbp->sb_mp = mp;
1006
1007 sbp->sb_tail = mp;
1008 sbp->sb_mlen += hp.sbh_totlen;
1009 sbp->sb_mcount++;
1010 } else {
1011 /* Timestamp must be done immediately */
1012 uniqtime(&t);
1013 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1014
1015 pad = Align(hp.sbh_totlen);
1016 hp.sbh_totlen += sizeof (hp);
1017 hp.sbh_totlen += pad;
1018
1019 /*
1020 * Would the inclusion of this message overflow the current
1021 * chunk? If so close the chunk off and start a new one.
1022 */
1023 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
1024 sbclosechunk(sbp);
1025
1026 if (sbp->sb_head == NULL) {
1027 /* Allocate leading header of new chunk */
1028 sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1029 if (sbp->sb_head == NULL) {
1030 /*
1031 * Memory allocation failure.
1032 * This will need to be revisited
1033 * since using certain flag combinations
1034 * can result in messages being dropped
1035 * silently.
1036 */
1037 freemsg(mp);
1038 sbp->sb_drops++;
1039 return;
1040 }
1041 sbp->sb_mp = sbp->sb_head;
1042 }
1043
1044 /*
1045 * Copy header into message
1046 */
1047 hp.sbh_drops = sbp->sb_drops;
1048 hp.sbh_origlen = origlen;
1049 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1050 sbp->sb_head->b_wptr += sizeof (hp);
1051
1052 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1053
1054 /*
1055 * Join message to the chunk
1056 */
1057 linkb(sbp->sb_head, mp);
1058
1059 sbp->sb_mcount++;
1060 sbp->sb_mlen += hp.sbh_totlen;
1061
1062 /*
1063 * If the first message alone is too big for the chunk close
1064 * the chunk now.
1065 * If the next message would immediately cause the chunk to
1066 * overflow we may as well close the chunk now. The next
1067 * message is certain to be at least SMALLEST_MESSAGE size.
1068 */
1069 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
1070 sbclosechunk(sbp);
1071 return;
1072 }
1073
1074 /*
1075 * Find space for the wrapper. The wrapper consists of:
1076 *
1077 * 1) Padding for this message (this is to ensure each header
1078 * begins on an 8 byte boundary in the userland buffer).
1079 *
1080 * 2) Space for the next message's header, in case the next
1081 * next message will fit in this chunk.
1082 *
1083 * It may be possible to append the wrapper to the last mblk
1084 * of the message, but only if we 'own' the data. If the dblk
1085 * has been shared through dupmsg() we mustn't alter it.
1086 */
1087
1088 wrapperlen = (sizeof (hp) + pad);
1089
1090 /* Is there space for the wrapper beyond the message's data ? */
1091 for (last = mp; last->b_cont; last = last->b_cont)
1092 ;
1093
1094 if ((wrapperlen <= MBLKTAIL(last)) &&
1095 (last->b_datap->db_ref == 1)) {
1096 if (pad > 0) {
1097 /*
1098 * Pad with zeroes to the next pointer boundary
1099 * (we don't want to disclose kernel data to
1100 * users), then advance wptr.
1101 */
1102 (void) memset(last->b_wptr, 0, pad);
1103 last->b_wptr += pad;
1104 }
1105 /* Remember where to write the header information */
1106 sbp->sb_head = last;
1107 } else {
1108 /* Have to allocate additional space for the wrapper */
1109 wrapper = allocb(wrapperlen, BPRI_MED);
1110 if (wrapper == NULL) {
1111 sbclosechunk(sbp);
1112 return;
1113 }
1114 if (pad > 0) {
1115 /*
1116 * Pad with zeroes (we don't want to disclose
1117 * kernel data to users).
1118 */
1119 (void) memset(wrapper->b_wptr, 0, pad);
1120 wrapper->b_wptr += pad;
1121 }
1122 /* Link the wrapper msg onto the end of the chunk */
1123 linkb(mp, wrapper);
1124 /* Remember to write the next header in this wrapper */
1125 sbp->sb_head = wrapper;
1126 }
1127 }
1128 }
1129
1130 /*
1131 * Called from timeout().
1132 * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1133 * to synchronize with any active module threads (open, close, wput, rput).
1134 */
1135 static void
1136 sbtick(void *arg)
1137 {
1138 struct sb *sbp = arg;
1139 queue_t *rq;
1140
1141 ASSERT(sbp);
1142
1143 rq = sbp->sb_rq;
1144 sbp->sb_timeoutid = 0; /* timeout has fired */
1145
1146 if (putctl(rq, M_CTL) == 0) /* failure */
1147 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1148 }
1149
1150 /*
1151 * Close off the currently accumulating chunk and pass
1152 * it upward. Takes care of resetting timers as well.
1153 *
1154 * This routine is called both directly and as a result
1155 * of the chunk timeout expiring.
1156 */
1157 static void
1158 sbclosechunk(struct sb *sbp)
1159 {
1160 mblk_t *mp;
1161 queue_t *rq;
1162
1163 ASSERT(sbp);
1164
1165 if (sbp->sb_timeoutid) {
1166 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1167 sbp->sb_timeoutid = 0;
1168 }
1169
1170 mp = sbp->sb_mp;
1171 rq = sbp->sb_rq;
1172
1173 /*
1174 * If there's currently a chunk in progress, close it off
1175 * and try to send it up.
1176 */
1177 if (mp) {
1178 sbsendit(rq, mp);
1179 }
1180
1181 /*
1182 * Clear old chunk. Ready for new msgs.
1183 */
1184 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1185 sbp->sb_mlen = 0;
1186 sbp->sb_mcount = 0;
1187 if (sbp->sb_flags & SB_DEFER_CHUNK)
1188 sbp->sb_state &= ~SB_FRCVD;
1189
1190 }
1191
1192 static void
1193 sbsendit(queue_t *rq, mblk_t *mp)
1194 {
1195 struct sb *sbp = (struct sb *)rq->q_ptr;
1196
1197 if (!canputnext(rq)) {
1198 if (sbp->sb_flags & SB_NO_DROPS)
1199 (void) putq(rq, mp);
1200 else {
1201 freemsg(mp);
1202 sbp->sb_drops += sbp->sb_mcount;
1203 }
1204 return;
1205 }
1206 /*
1207 * If there are messages on the q already, keep
1208 * queueing them since they need to be processed in order.
1209 */
1210 if (qsize(rq) > 0) {
1211 /* should only get here if SB_NO_DROPS */
1212 (void) putq(rq, mp);
1213 }
1214 else
1215 putnext(rq, mp);
1216 }