1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License, Version 1.0 only 6 * (the "License"). You may not use this file except in compliance 7 * with the License. 8 * 9 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 10 * or http://www.opensolaris.org/os/licensing. 11 * See the License for the specific language governing permissions 12 * and limitations under the License. 13 * 14 * When distributing Covered Code, include this CDDL HEADER in each 15 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 16 * If applicable, add the following below this CDDL HEADER, with the 17 * fields enclosed by brackets "[]" replaced with your own identifying 18 * information: Portions Copyright [yyyy] [name of copyright owner] 19 * 20 * CDDL HEADER END 21 */ 22 /* 23 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 24 * Use is subject to license terms. 25 */ 26 27 /* 28 * STREAMS Buffering module 29 * 30 * This streams module collects incoming messages from modules below 31 * it on the stream and buffers them up into a smaller number of 32 * aggregated messages. Its main purpose is to reduce overhead by 33 * cutting down on the number of read (or getmsg) calls its client 34 * user process makes. 35 * - only M_DATA is buffered. 36 * - multithreading assumes configured as D_MTQPAIR 37 * - packets are lost only if flag SB_NO_HEADER is clear and buffer 38 * allocation fails. 39 * - in order message transmission. This is enforced for messages other 40 * than high priority messages. 41 * - zero length messages on the read side are not passed up the 42 * stream but used internally for synchronization. 43 * FLAGS: 44 * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA. 45 * (conversion is the default for backwards compatibility 46 * hence the negative logic). 47 * - SB_NO_HEADER - no headers in buffered data. 48 * (adding headers is the default for backwards compatibility 49 * hence the negative logic). 50 * - SB_DEFER_CHUNK - provides improved response time in question-answer 51 * applications. Buffering is not enabled until the second message 52 * is received on the read side within the sb_ticks interval. 53 * This option will often be used in combination with flag SB_SEND_ON_WRITE. 54 * - SB_SEND_ON_WRITE - a write message results in any pending buffered read 55 * data being immediately sent upstream. 56 * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates 57 * the blocked flow condition downstream. If this flag is clear (default) 58 * messages will be dropped if the upstream flow is blocked. 59 */ 60 61 62 #include <sys/types.h> 63 #include <sys/errno.h> 64 #include <sys/debug.h> 65 #include <sys/stropts.h> 66 #include <sys/time.h> 67 #include <sys/stream.h> 68 #include <sys/conf.h> 69 #include <sys/ddi.h> 70 #include <sys/sunddi.h> 71 #include <sys/kmem.h> 72 #include <sys/strsun.h> 73 #include <sys/bufmod.h> 74 #include <sys/modctl.h> 75 #include <sys/isa_defs.h> 76 77 /* 78 * Per-Stream state information. 79 * 80 * If sb_ticks is negative, we don't deliver chunks until they're 81 * full. If it's zero, we deliver every packet as it arrives. (In 82 * this case we force sb_chunk to zero, to make the implementation 83 * easier.) Otherwise, sb_ticks gives the number of ticks in a 84 * buffering interval. The interval begins when the a read side data 85 * message is received and a timeout is not active. If sb_snap is 86 * zero, no truncation of the msg is done. 87 */ 88 struct sb { 89 queue_t *sb_rq; /* our rq */ 90 mblk_t *sb_mp; /* partial chunk */ 91 mblk_t *sb_head; /* pre-allocated space for the next header */ 92 mblk_t *sb_tail; /* first mblk of last message appended */ 93 uint_t sb_mlen; /* sb_mp length */ 94 uint_t sb_mcount; /* input msg count in sb_mp */ 95 uint_t sb_chunk; /* max chunk size */ 96 clock_t sb_ticks; /* timeout interval */ 97 timeout_id_t sb_timeoutid; /* qtimeout() id */ 98 uint_t sb_drops; /* cumulative # discarded msgs */ 99 uint_t sb_snap; /* snapshot length */ 100 uint_t sb_flags; /* flags field */ 101 uint_t sb_state; /* state variable */ 102 }; 103 104 /* 105 * Function prototypes. 106 */ 107 static int sbopen(queue_t *, dev_t *, int, int, cred_t *); 108 static int sbclose(queue_t *, int, cred_t *); 109 static void sbwput(queue_t *, mblk_t *); 110 static void sbrput(queue_t *, mblk_t *); 111 static void sbrsrv(queue_t *); 112 static void sbioctl(queue_t *, mblk_t *); 113 static void sbaddmsg(queue_t *, mblk_t *); 114 static void sbtick(void *); 115 static void sbclosechunk(struct sb *); 116 static void sbsendit(queue_t *, mblk_t *); 117 118 static struct module_info sb_minfo = { 119 21, /* mi_idnum */ 120 "bufmod", /* mi_idname */ 121 0, /* mi_minpsz */ 122 INFPSZ, /* mi_maxpsz */ 123 1, /* mi_hiwat */ 124 0 /* mi_lowat */ 125 }; 126 127 static struct qinit sb_rinit = { 128 (int (*)())sbrput, /* qi_putp */ 129 (int (*)())sbrsrv, /* qi_srvp */ 130 sbopen, /* qi_qopen */ 131 sbclose, /* qi_qclose */ 132 NULL, /* qi_qadmin */ 133 &sb_minfo, /* qi_minfo */ 134 NULL /* qi_mstat */ 135 }; 136 137 static struct qinit sb_winit = { 138 (int (*)())sbwput, /* qi_putp */ 139 NULL, /* qi_srvp */ 140 NULL, /* qi_qopen */ 141 NULL, /* qi_qclose */ 142 NULL, /* qi_qadmin */ 143 &sb_minfo, /* qi_minfo */ 144 NULL /* qi_mstat */ 145 }; 146 147 static struct streamtab sb_info = { 148 &sb_rinit, /* st_rdinit */ 149 &sb_winit, /* st_wrinit */ 150 NULL, /* st_muxrinit */ 151 NULL /* st_muxwinit */ 152 }; 153 154 155 /* 156 * This is the loadable module wrapper. 157 */ 158 159 static struct fmodsw fsw = { 160 "bufmod", 161 &sb_info, 162 D_MTQPAIR | D_MP 163 }; 164 165 /* 166 * Module linkage information for the kernel. 167 */ 168 169 static struct modlstrmod modlstrmod = { 170 &mod_strmodops, "streams buffer mod", &fsw 171 }; 172 173 static struct modlinkage modlinkage = { 174 MODREV_1, { &modlstrmod, NULL } 175 }; 176 177 178 int 179 _init(void) 180 { 181 return (mod_install(&modlinkage)); 182 } 183 184 int 185 _fini(void) 186 { 187 return (mod_remove(&modlinkage)); 188 } 189 190 int 191 _info(struct modinfo *modinfop) 192 { 193 return (mod_info(&modlinkage, modinfop)); 194 } 195 196 197 /* ARGSUSED */ 198 static int 199 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp) 200 { 201 struct sb *sbp; 202 ASSERT(rq); 203 204 if (sflag != MODOPEN) 205 return (EINVAL); 206 207 if (rq->q_ptr) 208 return (0); 209 210 /* 211 * Allocate and initialize per-Stream structure. 212 */ 213 sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP); 214 sbp->sb_rq = rq; 215 sbp->sb_ticks = -1; 216 sbp->sb_chunk = SB_DFLT_CHUNK; 217 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 218 sbp->sb_mlen = 0; 219 sbp->sb_mcount = 0; 220 sbp->sb_timeoutid = 0; 221 sbp->sb_drops = 0; 222 sbp->sb_snap = 0; 223 sbp->sb_flags = 0; 224 sbp->sb_state = 0; 225 226 rq->q_ptr = WR(rq)->q_ptr = sbp; 227 228 qprocson(rq); 229 230 231 return (0); 232 } 233 234 /* ARGSUSED1 */ 235 static int 236 sbclose(queue_t *rq, int flag, cred_t *credp) 237 { 238 struct sb *sbp = (struct sb *)rq->q_ptr; 239 240 ASSERT(sbp); 241 242 qprocsoff(rq); 243 /* 244 * Cancel an outstanding timeout 245 */ 246 if (sbp->sb_timeoutid != 0) { 247 (void) quntimeout(rq, sbp->sb_timeoutid); 248 sbp->sb_timeoutid = 0; 249 } 250 /* 251 * Free the current chunk. 252 */ 253 if (sbp->sb_mp) { 254 freemsg(sbp->sb_mp); 255 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 256 sbp->sb_mlen = 0; 257 } 258 259 /* 260 * Free the per-Stream structure. 261 */ 262 kmem_free((caddr_t)sbp, sizeof (struct sb)); 263 rq->q_ptr = WR(rq)->q_ptr = NULL; 264 265 return (0); 266 } 267 268 /* 269 * the correction factor is introduced to compensate for 270 * whatever assumptions the modules below have made about 271 * how much traffic is flowing through the stream and the fact 272 * that bufmod may be snipping messages with the sb_snap length. 273 */ 274 #define SNIT_HIWAT(msgsize, fudge) ((4 * msgsize * fudge) + 512) 275 #define SNIT_LOWAT(msgsize, fudge) ((2 * msgsize * fudge) + 256) 276 277 278 static void 279 sbioc(queue_t *wq, mblk_t *mp) 280 { 281 struct iocblk *iocp; 282 struct sb *sbp = (struct sb *)wq->q_ptr; 283 clock_t ticks; 284 mblk_t *mop; 285 286 iocp = (struct iocblk *)mp->b_rptr; 287 288 switch (iocp->ioc_cmd) { 289 case SBIOCGCHUNK: 290 case SBIOCGSNAP: 291 case SBIOCGFLAGS: 292 case SBIOCGTIME: 293 miocack(wq, mp, 0, 0); 294 return; 295 296 case SBIOCSTIME: 297 #ifdef _SYSCALL32_IMPL 298 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 299 struct timeval32 *t32; 300 301 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 302 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 303 miocnak(wq, mp, 0, EINVAL); 304 break; 305 } 306 ticks = TIMEVAL_TO_TICK(t32); 307 } else 308 #endif /* _SYSCALL32_IMPL */ 309 { 310 struct timeval *tb; 311 312 tb = (struct timeval *)mp->b_cont->b_rptr; 313 314 if (tb->tv_sec < 0 || tb->tv_usec < 0) { 315 miocnak(wq, mp, 0, EINVAL); 316 break; 317 } 318 ticks = TIMEVAL_TO_TICK(tb); 319 } 320 sbp->sb_ticks = ticks; 321 if (ticks == 0) 322 sbp->sb_chunk = 0; 323 miocack(wq, mp, 0, 0); 324 sbclosechunk(sbp); 325 return; 326 327 case SBIOCSCHUNK: 328 /* 329 * set up hi/lo water marks on stream head read queue. 330 * unlikely to run out of resources. Fix at later date. 331 */ 332 if ((mop = allocb(sizeof (struct stroptions), 333 BPRI_MED)) != NULL) { 334 struct stroptions *sop; 335 uint_t chunk; 336 337 chunk = *(uint_t *)mp->b_cont->b_rptr; 338 mop->b_datap->db_type = M_SETOPTS; 339 mop->b_wptr += sizeof (struct stroptions); 340 sop = (struct stroptions *)mop->b_rptr; 341 sop->so_flags = SO_HIWAT | SO_LOWAT; 342 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 343 sop->so_lowat = SNIT_LOWAT(chunk, 1); 344 qreply(wq, mop); 345 } 346 347 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 348 miocack(wq, mp, 0, 0); 349 sbclosechunk(sbp); 350 return; 351 352 case SBIOCSFLAGS: 353 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 354 miocack(wq, mp, 0, 0); 355 return; 356 357 case SBIOCSSNAP: 358 /* 359 * if chunking dont worry about effects of 360 * snipping of message size on head flow control 361 * since it has a relatively small bearing on the 362 * data rate onto the streamn head. 363 */ 364 if (!sbp->sb_chunk) { 365 /* 366 * set up hi/lo water marks on stream head read queue. 367 * unlikely to run out of resources. Fix at later date. 368 */ 369 if ((mop = allocb(sizeof (struct stroptions), 370 BPRI_MED)) != NULL) { 371 struct stroptions *sop; 372 uint_t snap; 373 int fudge; 374 375 snap = *(uint_t *)mp->b_cont->b_rptr; 376 mop->b_datap->db_type = M_SETOPTS; 377 mop->b_wptr += sizeof (struct stroptions); 378 sop = (struct stroptions *)mop->b_rptr; 379 sop->so_flags = SO_HIWAT | SO_LOWAT; 380 fudge = snap <= 100 ? 4 : 381 snap <= 400 ? 2 : 382 1; 383 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 384 sop->so_lowat = SNIT_LOWAT(snap, fudge); 385 qreply(wq, mop); 386 } 387 } 388 389 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 390 miocack(wq, mp, 0, 0); 391 return; 392 393 default: 394 ASSERT(0); 395 return; 396 } 397 } 398 399 /* 400 * Write-side put procedure. Its main task is to detect ioctls 401 * for manipulating the buffering state and hand them to sbioctl. 402 * Other message types are passed on through. 403 */ 404 static void 405 sbwput(queue_t *wq, mblk_t *mp) 406 { 407 struct sb *sbp = (struct sb *)wq->q_ptr; 408 struct copyresp *resp; 409 410 if (sbp->sb_flags & SB_SEND_ON_WRITE) 411 sbclosechunk(sbp); 412 switch (mp->b_datap->db_type) { 413 case M_IOCTL: 414 sbioctl(wq, mp); 415 break; 416 417 case M_IOCDATA: 418 resp = (struct copyresp *)mp->b_rptr; 419 if (resp->cp_rval) { 420 /* 421 * Just free message on failure. 422 */ 423 freemsg(mp); 424 break; 425 } 426 427 switch (resp->cp_cmd) { 428 case SBIOCSTIME: 429 case SBIOCSCHUNK: 430 case SBIOCSFLAGS: 431 case SBIOCSSNAP: 432 case SBIOCGTIME: 433 case SBIOCGCHUNK: 434 case SBIOCGSNAP: 435 case SBIOCGFLAGS: 436 sbioc(wq, mp); 437 break; 438 439 default: 440 putnext(wq, mp); 441 break; 442 } 443 break; 444 445 default: 446 putnext(wq, mp); 447 break; 448 } 449 } 450 451 /* 452 * Read-side put procedure. It's responsible for buffering up incoming 453 * messages and grouping them into aggregates according to the current 454 * buffering parameters. 455 */ 456 static void 457 sbrput(queue_t *rq, mblk_t *mp) 458 { 459 struct sb *sbp = (struct sb *)rq->q_ptr; 460 461 ASSERT(sbp); 462 463 switch (mp->b_datap->db_type) { 464 case M_PROTO: 465 if (sbp->sb_flags & SB_NO_PROTO_CVT) { 466 sbclosechunk(sbp); 467 sbsendit(rq, mp); 468 break; 469 } else { 470 /* 471 * Convert M_PROTO to M_DATA. 472 */ 473 mp->b_datap->db_type = M_DATA; 474 } 475 /* FALLTHRU */ 476 477 case M_DATA: 478 if ((sbp->sb_flags & SB_DEFER_CHUNK) && 479 !(sbp->sb_state & SB_FRCVD)) { 480 sbclosechunk(sbp); 481 sbsendit(rq, mp); 482 sbp->sb_state |= SB_FRCVD; 483 } else 484 sbaddmsg(rq, mp); 485 486 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid)) 487 sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick, 488 sbp, sbp->sb_ticks); 489 490 break; 491 492 case M_FLUSH: 493 if (*mp->b_rptr & FLUSHR) { 494 /* 495 * Reset timeout, flush the chunk currently in 496 * progress, and start a new chunk. 497 */ 498 if (sbp->sb_timeoutid) { 499 (void) quntimeout(sbp->sb_rq, 500 sbp->sb_timeoutid); 501 sbp->sb_timeoutid = 0; 502 } 503 if (sbp->sb_mp) { 504 freemsg(sbp->sb_mp); 505 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 506 sbp->sb_mlen = 0; 507 sbp->sb_mcount = 0; 508 } 509 flushq(rq, FLUSHALL); 510 } 511 putnext(rq, mp); 512 break; 513 514 case M_CTL: 515 /* 516 * Zero-length M_CTL means our timeout() popped. 517 */ 518 if (MBLKL(mp) == 0) { 519 freemsg(mp); 520 sbclosechunk(sbp); 521 } else { 522 sbclosechunk(sbp); 523 sbsendit(rq, mp); 524 } 525 break; 526 527 default: 528 if (mp->b_datap->db_type <= QPCTL) { 529 sbclosechunk(sbp); 530 sbsendit(rq, mp); 531 } else { 532 /* Note: out of band */ 533 putnext(rq, mp); 534 } 535 break; 536 } 537 } 538 539 /* 540 * read service procedure. 541 */ 542 /* ARGSUSED */ 543 static void 544 sbrsrv(queue_t *rq) 545 { 546 mblk_t *mp; 547 548 /* 549 * High priority messages shouldn't get here but if 550 * one does, jam it through to avoid infinite loop. 551 */ 552 while ((mp = getq(rq)) != NULL) { 553 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) { 554 /* should only get here if SB_NO_SROPS */ 555 (void) putbq(rq, mp); 556 return; 557 } 558 putnext(rq, mp); 559 } 560 } 561 562 /* 563 * Handle write-side M_IOCTL messages. 564 */ 565 static void 566 sbioctl(queue_t *wq, mblk_t *mp) 567 { 568 struct sb *sbp = (struct sb *)wq->q_ptr; 569 struct iocblk *iocp = (struct iocblk *)mp->b_rptr; 570 struct timeval *t; 571 clock_t ticks; 572 mblk_t *mop; 573 int transparent = iocp->ioc_count; 574 mblk_t *datamp; 575 int error; 576 577 switch (iocp->ioc_cmd) { 578 case SBIOCSTIME: 579 if (iocp->ioc_count == TRANSPARENT) { 580 #ifdef _SYSCALL32_IMPL 581 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 582 mcopyin(mp, NULL, sizeof (struct timeval32), 583 NULL); 584 } else 585 #endif /* _SYSCALL32_IMPL */ 586 { 587 mcopyin(mp, NULL, sizeof (*t), NULL); 588 } 589 qreply(wq, mp); 590 } else { 591 /* 592 * Verify argument length. 593 */ 594 #ifdef _SYSCALL32_IMPL 595 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 596 struct timeval32 *t32; 597 598 error = miocpullup(mp, 599 sizeof (struct timeval32)); 600 if (error != 0) { 601 miocnak(wq, mp, 0, error); 602 break; 603 } 604 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 605 if (t32->tv_sec < 0 || t32->tv_usec < 0) { 606 miocnak(wq, mp, 0, EINVAL); 607 break; 608 } 609 ticks = TIMEVAL_TO_TICK(t32); 610 } else 611 #endif /* _SYSCALL32_IMPL */ 612 { 613 error = miocpullup(mp, sizeof (struct timeval)); 614 if (error != 0) { 615 miocnak(wq, mp, 0, error); 616 break; 617 } 618 619 t = (struct timeval *)mp->b_cont->b_rptr; 620 if (t->tv_sec < 0 || t->tv_usec < 0) { 621 miocnak(wq, mp, 0, EINVAL); 622 break; 623 } 624 ticks = TIMEVAL_TO_TICK(t); 625 } 626 sbp->sb_ticks = ticks; 627 if (ticks == 0) 628 sbp->sb_chunk = 0; 629 miocack(wq, mp, 0, 0); 630 sbclosechunk(sbp); 631 } 632 break; 633 634 case SBIOCGTIME: { 635 struct timeval *t; 636 637 /* 638 * Verify argument length. 639 */ 640 if (transparent != TRANSPARENT) { 641 #ifdef _SYSCALL32_IMPL 642 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 643 error = miocpullup(mp, 644 sizeof (struct timeval32)); 645 if (error != 0) { 646 miocnak(wq, mp, 0, error); 647 break; 648 } 649 } else 650 #endif /* _SYSCALL32_IMPL */ 651 error = miocpullup(mp, sizeof (struct timeval)); 652 if (error != 0) { 653 miocnak(wq, mp, 0, error); 654 break; 655 } 656 } 657 658 /* 659 * If infinite timeout, return range error 660 * for the ioctl. 661 */ 662 if (sbp->sb_ticks < 0) { 663 miocnak(wq, mp, 0, ERANGE); 664 break; 665 } 666 667 #ifdef _SYSCALL32_IMPL 668 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) { 669 struct timeval32 *t32; 670 671 if (transparent == TRANSPARENT) { 672 datamp = allocb(sizeof (*t32), BPRI_MED); 673 if (datamp == NULL) { 674 miocnak(wq, mp, 0, EAGAIN); 675 break; 676 } 677 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp); 678 } 679 680 t32 = (struct timeval32 *)mp->b_cont->b_rptr; 681 TICK_TO_TIMEVAL32(sbp->sb_ticks, t32); 682 683 if (transparent == TRANSPARENT) 684 qreply(wq, mp); 685 else 686 miocack(wq, mp, sizeof (*t32), 0); 687 } else 688 #endif /* _SYSCALL32_IMPL */ 689 { 690 if (transparent == TRANSPARENT) { 691 datamp = allocb(sizeof (*t), BPRI_MED); 692 if (datamp == NULL) { 693 miocnak(wq, mp, 0, EAGAIN); 694 break; 695 } 696 mcopyout(mp, NULL, sizeof (*t), NULL, datamp); 697 } 698 699 t = (struct timeval *)mp->b_cont->b_rptr; 700 TICK_TO_TIMEVAL(sbp->sb_ticks, t); 701 702 if (transparent == TRANSPARENT) 703 qreply(wq, mp); 704 else 705 miocack(wq, mp, sizeof (*t), 0); 706 } 707 break; 708 } 709 710 case SBIOCCTIME: 711 sbp->sb_ticks = -1; 712 miocack(wq, mp, 0, 0); 713 break; 714 715 case SBIOCSCHUNK: 716 if (iocp->ioc_count == TRANSPARENT) { 717 mcopyin(mp, NULL, sizeof (uint_t), NULL); 718 qreply(wq, mp); 719 } else { 720 /* 721 * Verify argument length. 722 */ 723 error = miocpullup(mp, sizeof (uint_t)); 724 if (error != 0) { 725 miocnak(wq, mp, 0, error); 726 break; 727 } 728 729 /* 730 * set up hi/lo water marks on stream head read queue. 731 * unlikely to run out of resources. Fix at later date. 732 */ 733 if ((mop = allocb(sizeof (struct stroptions), 734 BPRI_MED)) != NULL) { 735 struct stroptions *sop; 736 uint_t chunk; 737 738 chunk = *(uint_t *)mp->b_cont->b_rptr; 739 mop->b_datap->db_type = M_SETOPTS; 740 mop->b_wptr += sizeof (struct stroptions); 741 sop = (struct stroptions *)mop->b_rptr; 742 sop->so_flags = SO_HIWAT | SO_LOWAT; 743 sop->so_hiwat = SNIT_HIWAT(chunk, 1); 744 sop->so_lowat = SNIT_LOWAT(chunk, 1); 745 qreply(wq, mop); 746 } 747 748 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr; 749 miocack(wq, mp, 0, 0); 750 sbclosechunk(sbp); 751 } 752 break; 753 754 case SBIOCGCHUNK: 755 /* 756 * Verify argument length. 757 */ 758 if (transparent != TRANSPARENT) { 759 error = miocpullup(mp, sizeof (uint_t)); 760 if (error != 0) { 761 miocnak(wq, mp, 0, error); 762 break; 763 } 764 } 765 766 if (transparent == TRANSPARENT) { 767 datamp = allocb(sizeof (uint_t), BPRI_MED); 768 if (datamp == NULL) { 769 miocnak(wq, mp, 0, EAGAIN); 770 break; 771 } 772 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 773 } 774 775 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk; 776 777 if (transparent == TRANSPARENT) 778 qreply(wq, mp); 779 else 780 miocack(wq, mp, sizeof (uint_t), 0); 781 break; 782 783 case SBIOCSSNAP: 784 if (iocp->ioc_count == TRANSPARENT) { 785 mcopyin(mp, NULL, sizeof (uint_t), NULL); 786 qreply(wq, mp); 787 } else { 788 /* 789 * Verify argument length. 790 */ 791 error = miocpullup(mp, sizeof (uint_t)); 792 if (error != 0) { 793 miocnak(wq, mp, 0, error); 794 break; 795 } 796 797 /* 798 * if chunking dont worry about effects of 799 * snipping of message size on head flow control 800 * since it has a relatively small bearing on the 801 * data rate onto the streamn head. 802 */ 803 if (!sbp->sb_chunk) { 804 /* 805 * set up hi/lo water marks on stream 806 * head read queue. unlikely to run out 807 * of resources. Fix at later date. 808 */ 809 if ((mop = allocb(sizeof (struct stroptions), 810 BPRI_MED)) != NULL) { 811 struct stroptions *sop; 812 uint_t snap; 813 int fudge; 814 815 snap = *(uint_t *)mp->b_cont->b_rptr; 816 mop->b_datap->db_type = M_SETOPTS; 817 mop->b_wptr += sizeof (*sop); 818 sop = (struct stroptions *)mop->b_rptr; 819 sop->so_flags = SO_HIWAT | SO_LOWAT; 820 fudge = (snap <= 100) ? 4 : 821 (snap <= 400) ? 2 : 1; 822 sop->so_hiwat = SNIT_HIWAT(snap, fudge); 823 sop->so_lowat = SNIT_LOWAT(snap, fudge); 824 qreply(wq, mop); 825 } 826 } 827 828 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr; 829 830 miocack(wq, mp, 0, 0); 831 } 832 break; 833 834 case SBIOCGSNAP: 835 /* 836 * Verify argument length 837 */ 838 if (transparent != TRANSPARENT) { 839 error = miocpullup(mp, sizeof (uint_t)); 840 if (error != 0) { 841 miocnak(wq, mp, 0, error); 842 break; 843 } 844 } 845 846 if (transparent == TRANSPARENT) { 847 datamp = allocb(sizeof (uint_t), BPRI_MED); 848 if (datamp == NULL) { 849 miocnak(wq, mp, 0, EAGAIN); 850 break; 851 } 852 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 853 } 854 855 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap; 856 857 if (transparent == TRANSPARENT) 858 qreply(wq, mp); 859 else 860 miocack(wq, mp, sizeof (uint_t), 0); 861 break; 862 863 case SBIOCSFLAGS: 864 /* 865 * set the flags. 866 */ 867 if (iocp->ioc_count == TRANSPARENT) { 868 mcopyin(mp, NULL, sizeof (uint_t), NULL); 869 qreply(wq, mp); 870 } else { 871 error = miocpullup(mp, sizeof (uint_t)); 872 if (error != 0) { 873 miocnak(wq, mp, 0, error); 874 break; 875 } 876 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr; 877 miocack(wq, mp, 0, 0); 878 } 879 break; 880 881 case SBIOCGFLAGS: 882 /* 883 * Verify argument length 884 */ 885 if (transparent != TRANSPARENT) { 886 error = miocpullup(mp, sizeof (uint_t)); 887 if (error != 0) { 888 miocnak(wq, mp, 0, error); 889 break; 890 } 891 } 892 893 if (transparent == TRANSPARENT) { 894 datamp = allocb(sizeof (uint_t), BPRI_MED); 895 if (datamp == NULL) { 896 miocnak(wq, mp, 0, EAGAIN); 897 break; 898 } 899 mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp); 900 } 901 902 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags; 903 904 if (transparent == TRANSPARENT) 905 qreply(wq, mp); 906 else 907 miocack(wq, mp, sizeof (uint_t), 0); 908 break; 909 910 911 default: 912 putnext(wq, mp); 913 break; 914 } 915 } 916 917 /* 918 * Given a length l, calculate the amount of extra storage 919 * required to round it up to the next multiple of the alignment a. 920 */ 921 #define RoundUpAmt(l, a) ((l) % (a) ? (a) - ((l) % (a)) : 0) 922 /* 923 * Calculate additional amount of space required for alignment. 924 */ 925 #define Align(l) RoundUpAmt(l, sizeof (ulong_t)) 926 /* 927 * Smallest possible message size when headers are enabled. 928 * This is used to calculate whether a chunk is nearly full. 929 */ 930 #define SMALLEST_MESSAGE sizeof (struct sb_hdr) + _POINTER_ALIGNMENT 931 932 /* 933 * Process a read-side M_DATA message. 934 * 935 * If the currently accumulating chunk doesn't have enough room 936 * for the message, close off the chunk, pass it upward, and start 937 * a new one. Then add the message to the current chunk, taking 938 * account of the possibility that the message's size exceeds the 939 * chunk size. 940 * 941 * If headers are enabled add an sb_hdr header and trailing alignment padding. 942 * 943 * To optimise performance the total number of msgbs should be kept 944 * to a minimum. This is achieved by using any remaining space in message N 945 * for both its own padding as well as the header of message N+1 if possible. 946 * If there's insufficient space we allocate one message to hold this 'wrapper'. 947 * (there's likely to be space beyond message N, since allocb would have 948 * rounded up the required size to one of the dblk_sizes). 949 * 950 */ 951 static void 952 sbaddmsg(queue_t *rq, mblk_t *mp) 953 { 954 struct sb *sbp; 955 struct timeval t; 956 struct sb_hdr hp; 957 mblk_t *wrapper; /* padding for msg N, header for msg N+1 */ 958 mblk_t *last; /* last mblk of current message */ 959 size_t wrapperlen; /* length of header + padding */ 960 size_t origlen; /* data length before truncation */ 961 size_t pad; /* bytes required to align header */ 962 963 sbp = (struct sb *)rq->q_ptr; 964 965 origlen = msgdsize(mp); 966 967 /* 968 * Truncate the message. 969 */ 970 if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) && 971 (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1)) 972 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap; 973 else 974 hp.sbh_totlen = hp.sbh_msglen = origlen; 975 976 if (sbp->sb_flags & SB_NO_HEADER) { 977 978 /* 979 * Would the inclusion of this message overflow the current 980 * chunk? If so close the chunk off and start a new one. 981 */ 982 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk) 983 sbclosechunk(sbp); 984 /* 985 * First message too big for chunk - just send it up. 986 * This will always be true when we're not chunking. 987 */ 988 if (hp.sbh_totlen > sbp->sb_chunk) { 989 sbsendit(rq, mp); 990 return; 991 } 992 993 /* 994 * We now know that the msg will fit in the chunk. 995 * Link it onto the end of the chunk. 996 * Since linkb() walks the entire chain, we keep a pointer to 997 * the first mblk of the last msgb added and call linkb on that 998 * that last message, rather than performing the 999 * O(n) linkb() operation on the whole chain. 1000 * sb_head isn't needed in this SB_NO_HEADER mode. 1001 */ 1002 if (sbp->sb_mp) 1003 linkb(sbp->sb_tail, mp); 1004 else 1005 sbp->sb_mp = mp; 1006 1007 sbp->sb_tail = mp; 1008 sbp->sb_mlen += hp.sbh_totlen; 1009 sbp->sb_mcount++; 1010 } else { 1011 /* Timestamp must be done immediately */ 1012 uniqtime(&t); 1013 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t); 1014 1015 pad = Align(hp.sbh_totlen); 1016 hp.sbh_totlen += sizeof (hp); 1017 hp.sbh_totlen += pad; 1018 1019 /* 1020 * Would the inclusion of this message overflow the current 1021 * chunk? If so close the chunk off and start a new one. 1022 */ 1023 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk) 1024 sbclosechunk(sbp); 1025 1026 if (sbp->sb_head == NULL) { 1027 /* Allocate leading header of new chunk */ 1028 sbp->sb_head = allocb(sizeof (hp), BPRI_MED); 1029 if (sbp->sb_head == NULL) { 1030 /* 1031 * Memory allocation failure. 1032 * This will need to be revisited 1033 * since using certain flag combinations 1034 * can result in messages being dropped 1035 * silently. 1036 */ 1037 freemsg(mp); 1038 sbp->sb_drops++; 1039 return; 1040 } 1041 sbp->sb_mp = sbp->sb_head; 1042 } 1043 1044 /* 1045 * Copy header into message 1046 */ 1047 hp.sbh_drops = sbp->sb_drops; 1048 hp.sbh_origlen = origlen; 1049 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp)); 1050 sbp->sb_head->b_wptr += sizeof (hp); 1051 1052 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim); 1053 1054 /* 1055 * Join message to the chunk 1056 */ 1057 linkb(sbp->sb_head, mp); 1058 1059 sbp->sb_mcount++; 1060 sbp->sb_mlen += hp.sbh_totlen; 1061 1062 /* 1063 * If the first message alone is too big for the chunk close 1064 * the chunk now. 1065 * If the next message would immediately cause the chunk to 1066 * overflow we may as well close the chunk now. The next 1067 * message is certain to be at least SMALLEST_MESSAGE size. 1068 */ 1069 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) { 1070 sbclosechunk(sbp); 1071 return; 1072 } 1073 1074 /* 1075 * Find space for the wrapper. The wrapper consists of: 1076 * 1077 * 1) Padding for this message (this is to ensure each header 1078 * begins on an 8 byte boundary in the userland buffer). 1079 * 1080 * 2) Space for the next message's header, in case the next 1081 * next message will fit in this chunk. 1082 * 1083 * It may be possible to append the wrapper to the last mblk 1084 * of the message, but only if we 'own' the data. If the dblk 1085 * has been shared through dupmsg() we mustn't alter it. 1086 */ 1087 1088 wrapperlen = (sizeof (hp) + pad); 1089 1090 /* Is there space for the wrapper beyond the message's data ? */ 1091 for (last = mp; last->b_cont; last = last->b_cont) 1092 ; 1093 1094 if ((wrapperlen <= MBLKTAIL(last)) && 1095 (last->b_datap->db_ref == 1)) { 1096 if (pad > 0) { 1097 /* 1098 * Pad with zeroes to the next pointer boundary 1099 * (we don't want to disclose kernel data to 1100 * users), then advance wptr. 1101 */ 1102 (void) memset(last->b_wptr, 0, pad); 1103 last->b_wptr += pad; 1104 } 1105 /* Remember where to write the header information */ 1106 sbp->sb_head = last; 1107 } else { 1108 /* Have to allocate additional space for the wrapper */ 1109 wrapper = allocb(wrapperlen, BPRI_MED); 1110 if (wrapper == NULL) { 1111 sbclosechunk(sbp); 1112 return; 1113 } 1114 if (pad > 0) { 1115 /* 1116 * Pad with zeroes (we don't want to disclose 1117 * kernel data to users). 1118 */ 1119 (void) memset(wrapper->b_wptr, 0, pad); 1120 wrapper->b_wptr += pad; 1121 } 1122 /* Link the wrapper msg onto the end of the chunk */ 1123 linkb(mp, wrapper); 1124 /* Remember to write the next header in this wrapper */ 1125 sbp->sb_head = wrapper; 1126 } 1127 } 1128 } 1129 1130 /* 1131 * Called from timeout(). 1132 * Signal a timeout by passing a zero-length M_CTL msg in the read-side 1133 * to synchronize with any active module threads (open, close, wput, rput). 1134 */ 1135 static void 1136 sbtick(void *arg) 1137 { 1138 struct sb *sbp = arg; 1139 queue_t *rq; 1140 1141 ASSERT(sbp); 1142 1143 rq = sbp->sb_rq; 1144 sbp->sb_timeoutid = 0; /* timeout has fired */ 1145 1146 if (putctl(rq, M_CTL) == 0) /* failure */ 1147 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks); 1148 } 1149 1150 /* 1151 * Close off the currently accumulating chunk and pass 1152 * it upward. Takes care of resetting timers as well. 1153 * 1154 * This routine is called both directly and as a result 1155 * of the chunk timeout expiring. 1156 */ 1157 static void 1158 sbclosechunk(struct sb *sbp) 1159 { 1160 mblk_t *mp; 1161 queue_t *rq; 1162 1163 ASSERT(sbp); 1164 1165 if (sbp->sb_timeoutid) { 1166 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid); 1167 sbp->sb_timeoutid = 0; 1168 } 1169 1170 mp = sbp->sb_mp; 1171 rq = sbp->sb_rq; 1172 1173 /* 1174 * If there's currently a chunk in progress, close it off 1175 * and try to send it up. 1176 */ 1177 if (mp) { 1178 sbsendit(rq, mp); 1179 } 1180 1181 /* 1182 * Clear old chunk. Ready for new msgs. 1183 */ 1184 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL; 1185 sbp->sb_mlen = 0; 1186 sbp->sb_mcount = 0; 1187 if (sbp->sb_flags & SB_DEFER_CHUNK) 1188 sbp->sb_state &= ~SB_FRCVD; 1189 1190 } 1191 1192 static void 1193 sbsendit(queue_t *rq, mblk_t *mp) 1194 { 1195 struct sb *sbp = (struct sb *)rq->q_ptr; 1196 1197 if (!canputnext(rq)) { 1198 if (sbp->sb_flags & SB_NO_DROPS) 1199 (void) putq(rq, mp); 1200 else { 1201 freemsg(mp); 1202 sbp->sb_drops += sbp->sb_mcount; 1203 } 1204 return; 1205 } 1206 /* 1207 * If there are messages on the q already, keep 1208 * queueing them since they need to be processed in order. 1209 */ 1210 if (qsize(rq) > 0) { 1211 /* should only get here if SB_NO_DROPS */ 1212 (void) putq(rq, mp); 1213 } 1214 else 1215 putnext(rq, mp); 1216 }