1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License, Version 1.0 only
   6  * (the "License").  You may not use this file except in compliance
   7  * with the License.
   8  *
   9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
  10  * or http://www.opensolaris.org/os/licensing.
  11  * See the License for the specific language governing permissions
  12  * and limitations under the License.
  13  *
  14  * When distributing Covered Code, include this CDDL HEADER in each
  15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  16  * If applicable, add the following below this CDDL HEADER, with the
  17  * fields enclosed by brackets "[]" replaced with your own identifying
  18  * information: Portions Copyright [yyyy] [name of copyright owner]
  19  *
  20  * CDDL HEADER END
  21  */
  22 /*
  23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
  24  * Use is subject to license terms.
  25  */
  26 
  27 /*
  28  * STREAMS Buffering module
  29  *
  30  * This streams module collects incoming messages from modules below
  31  * it on the stream and buffers them up into a smaller number of
  32  * aggregated messages.  Its main purpose is to reduce overhead by
  33  * cutting down on the number of read (or getmsg) calls its client
  34  * user process makes.
  35  *  - only M_DATA is buffered.
  36  *  - multithreading assumes configured as D_MTQPAIR
  37  *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
  38  *    allocation fails.
  39  *  - in order message transmission. This is enforced for messages other
  40  *    than high priority messages.
  41  *  - zero length messages on the read side are not passed up the
  42  *    stream but used internally for synchronization.
  43  * FLAGS:
  44  * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
  45  *   (conversion is the default for backwards compatibility
  46  *    hence the negative logic).
  47  * - SB_NO_HEADER - no headers in buffered data.
  48  *   (adding headers is the default for backwards compatibility
  49  *    hence the negative logic).
  50  * - SB_DEFER_CHUNK - provides improved response time in question-answer
  51  *   applications. Buffering is not enabled until the second message
  52  *   is received on the read side within the sb_ticks interval.
  53  *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
  54  * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
  55  *   data being immediately sent upstream.
  56  * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
  57  *   the blocked flow condition downstream. If this flag is clear (default)
  58  *   messages will be dropped if the upstream flow is blocked.
  59  */
  60 
  61 
  62 #include        <sys/types.h>
  63 #include        <sys/errno.h>
  64 #include        <sys/debug.h>
  65 #include        <sys/stropts.h>
  66 #include        <sys/time.h>
  67 #include        <sys/stream.h>
  68 #include        <sys/conf.h>
  69 #include        <sys/ddi.h>
  70 #include        <sys/sunddi.h>
  71 #include        <sys/kmem.h>
  72 #include        <sys/strsun.h>
  73 #include        <sys/bufmod.h>
  74 #include        <sys/modctl.h>
  75 #include        <sys/isa_defs.h>
  76 
  77 /*
  78  * Per-Stream state information.
  79  *
  80  * If sb_ticks is negative, we don't deliver chunks until they're
  81  * full.  If it's zero, we deliver every packet as it arrives.  (In
  82  * this case we force sb_chunk to zero, to make the implementation
  83  * easier.)  Otherwise, sb_ticks gives the number of ticks in a
  84  * buffering interval. The interval begins when the a read side data
  85  * message is received and a timeout is not active. If sb_snap is
  86  * zero, no truncation of the msg is done.
  87  */
  88 struct sb {
  89         queue_t *sb_rq;         /* our rq */
  90         mblk_t  *sb_mp;         /* partial chunk */
  91         mblk_t  *sb_head;       /* pre-allocated space for the next header */
  92         mblk_t  *sb_tail;       /* first mblk of last message appended */
  93         uint_t  sb_mlen;        /* sb_mp length */
  94         uint_t  sb_mcount;      /* input msg count in sb_mp */
  95         uint_t  sb_chunk;       /* max chunk size */
  96         clock_t sb_ticks;       /* timeout interval */
  97         timeout_id_t sb_timeoutid; /* qtimeout() id */
  98         uint_t  sb_drops;       /* cumulative # discarded msgs */
  99         uint_t  sb_snap;        /* snapshot length */
 100         uint_t  sb_flags;       /* flags field */
 101         uint_t  sb_state;       /* state variable */
 102 };
 103 
 104 /*
 105  * Function prototypes.
 106  */
 107 static  int     sbopen(queue_t *, dev_t *, int, int, cred_t *);
 108 static  int     sbclose(queue_t *, int, cred_t *);
 109 static  void    sbwput(queue_t *, mblk_t *);
 110 static  void    sbrput(queue_t *, mblk_t *);
 111 static  void    sbrsrv(queue_t *);
 112 static  void    sbioctl(queue_t *, mblk_t *);
 113 static  void    sbaddmsg(queue_t *, mblk_t *);
 114 static  void    sbtick(void *);
 115 static  void    sbclosechunk(struct sb *);
 116 static  void    sbsendit(queue_t *, mblk_t *);
 117 
 118 static struct module_info       sb_minfo = {
 119         21,             /* mi_idnum */
 120         "bufmod",       /* mi_idname */
 121         0,              /* mi_minpsz */
 122         INFPSZ,         /* mi_maxpsz */
 123         1,              /* mi_hiwat */
 124         0               /* mi_lowat */
 125 };
 126 
 127 static struct qinit     sb_rinit = {
 128         (int (*)())sbrput,      /* qi_putp */
 129         (int (*)())sbrsrv,      /* qi_srvp */
 130         sbopen,                 /* qi_qopen */
 131         sbclose,                /* qi_qclose */
 132         NULL,                   /* qi_qadmin */
 133         &sb_minfo,          /* qi_minfo */
 134         NULL                    /* qi_mstat */
 135 };
 136 
 137 static struct qinit     sb_winit = {
 138         (int (*)())sbwput,      /* qi_putp */
 139         NULL,                   /* qi_srvp */
 140         NULL,                   /* qi_qopen */
 141         NULL,                   /* qi_qclose */
 142         NULL,                   /* qi_qadmin */
 143         &sb_minfo,          /* qi_minfo */
 144         NULL                    /* qi_mstat */
 145 };
 146 
 147 static struct streamtab sb_info = {
 148         &sb_rinit,  /* st_rdinit */
 149         &sb_winit,  /* st_wrinit */
 150         NULL,           /* st_muxrinit */
 151         NULL            /* st_muxwinit */
 152 };
 153 
 154 
 155 /*
 156  * This is the loadable module wrapper.
 157  */
 158 
 159 static struct fmodsw fsw = {
 160         "bufmod",
 161         &sb_info,
 162         D_MTQPAIR | D_MP
 163 };
 164 
 165 /*
 166  * Module linkage information for the kernel.
 167  */
 168 
 169 static struct modlstrmod modlstrmod = {
 170         &mod_strmodops, "streams buffer mod", &fsw
 171 };
 172 
 173 static struct modlinkage modlinkage = {
 174         MODREV_1,  { &modlstrmod, NULL }
 175 };
 176 
 177 
 178 int
 179 _init(void)
 180 {
 181         return (mod_install(&modlinkage));
 182 }
 183 
 184 int
 185 _fini(void)
 186 {
 187         return (mod_remove(&modlinkage));
 188 }
 189 
 190 int
 191 _info(struct modinfo *modinfop)
 192 {
 193         return (mod_info(&modlinkage, modinfop));
 194 }
 195 
 196 
 197 /* ARGSUSED */
 198 static int
 199 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
 200 {
 201         struct sb       *sbp;
 202         ASSERT(rq);
 203 
 204         if (sflag != MODOPEN)
 205                 return (EINVAL);
 206 
 207         if (rq->q_ptr)
 208                 return (0);
 209 
 210         /*
 211          * Allocate and initialize per-Stream structure.
 212          */
 213         sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
 214         sbp->sb_rq = rq;
 215         sbp->sb_ticks = -1;
 216         sbp->sb_chunk = SB_DFLT_CHUNK;
 217         sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
 218         sbp->sb_mlen = 0;
 219         sbp->sb_mcount = 0;
 220         sbp->sb_timeoutid = 0;
 221         sbp->sb_drops = 0;
 222         sbp->sb_snap = 0;
 223         sbp->sb_flags = 0;
 224         sbp->sb_state = 0;
 225 
 226         rq->q_ptr = WR(rq)->q_ptr = sbp;
 227 
 228         qprocson(rq);
 229 
 230 
 231         return (0);
 232 }
 233 
 234 /* ARGSUSED1 */
 235 static int
 236 sbclose(queue_t *rq, int flag, cred_t *credp)
 237 {
 238         struct  sb      *sbp = (struct sb *)rq->q_ptr;
 239 
 240         ASSERT(sbp);
 241 
 242         qprocsoff(rq);
 243         /*
 244          * Cancel an outstanding timeout
 245          */
 246         if (sbp->sb_timeoutid != 0) {
 247                 (void) quntimeout(rq, sbp->sb_timeoutid);
 248                 sbp->sb_timeoutid = 0;
 249         }
 250         /*
 251          * Free the current chunk.
 252          */
 253         if (sbp->sb_mp) {
 254                 freemsg(sbp->sb_mp);
 255                 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
 256                 sbp->sb_mlen = 0;
 257         }
 258 
 259         /*
 260          * Free the per-Stream structure.
 261          */
 262         kmem_free((caddr_t)sbp, sizeof (struct sb));
 263         rq->q_ptr = WR(rq)->q_ptr = NULL;
 264 
 265         return (0);
 266 }
 267 
 268 /*
 269  * the correction factor is introduced to compensate for
 270  * whatever assumptions the modules below have made about
 271  * how much traffic is flowing through the stream and the fact
 272  * that bufmod may be snipping messages with the sb_snap length.
 273  */
 274 #define SNIT_HIWAT(msgsize, fudge)      ((4 * msgsize * fudge) + 512)
 275 #define SNIT_LOWAT(msgsize, fudge)      ((2 * msgsize * fudge) + 256)
 276 
 277 
 278 static void
 279 sbioc(queue_t *wq, mblk_t *mp)
 280 {
 281         struct iocblk *iocp;
 282         struct sb *sbp = (struct sb *)wq->q_ptr;
 283         clock_t ticks;
 284         mblk_t  *mop;
 285 
 286         iocp = (struct iocblk *)mp->b_rptr;
 287 
 288         switch (iocp->ioc_cmd) {
 289         case SBIOCGCHUNK:
 290         case SBIOCGSNAP:
 291         case SBIOCGFLAGS:
 292         case SBIOCGTIME:
 293                 miocack(wq, mp, 0, 0);
 294                 return;
 295 
 296         case SBIOCSTIME:
 297 #ifdef _SYSCALL32_IMPL
 298                 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 299                         struct timeval32 *t32;
 300 
 301                         t32 = (struct timeval32 *)mp->b_cont->b_rptr;
 302                         if (t32->tv_sec < 0 || t32->tv_usec < 0) {
 303                                 miocnak(wq, mp, 0, EINVAL);
 304                                 break;
 305                         }
 306                         ticks = TIMEVAL_TO_TICK(t32);
 307                 } else
 308 #endif /* _SYSCALL32_IMPL */
 309                 {
 310                         struct timeval *tb;
 311 
 312                         tb = (struct timeval *)mp->b_cont->b_rptr;
 313 
 314                         if (tb->tv_sec < 0 || tb->tv_usec < 0) {
 315                                 miocnak(wq, mp, 0, EINVAL);
 316                                 break;
 317                         }
 318                         ticks = TIMEVAL_TO_TICK(tb);
 319                 }
 320                 sbp->sb_ticks = ticks;
 321                 if (ticks == 0)
 322                         sbp->sb_chunk = 0;
 323                 miocack(wq, mp, 0, 0);
 324                 sbclosechunk(sbp);
 325                 return;
 326 
 327         case SBIOCSCHUNK:
 328                 /*
 329                  * set up hi/lo water marks on stream head read queue.
 330                  * unlikely to run out of resources. Fix at later date.
 331                  */
 332                 if ((mop = allocb(sizeof (struct stroptions),
 333                     BPRI_MED)) != NULL) {
 334                         struct stroptions *sop;
 335                         uint_t chunk;
 336 
 337                         chunk = *(uint_t *)mp->b_cont->b_rptr;
 338                         mop->b_datap->db_type = M_SETOPTS;
 339                         mop->b_wptr += sizeof (struct stroptions);
 340                         sop = (struct stroptions *)mop->b_rptr;
 341                         sop->so_flags = SO_HIWAT | SO_LOWAT;
 342                         sop->so_hiwat = SNIT_HIWAT(chunk, 1);
 343                         sop->so_lowat = SNIT_LOWAT(chunk, 1);
 344                         qreply(wq, mop);
 345                 }
 346 
 347                 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
 348                 miocack(wq, mp, 0, 0);
 349                 sbclosechunk(sbp);
 350                 return;
 351 
 352         case SBIOCSFLAGS:
 353                 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
 354                 miocack(wq, mp, 0, 0);
 355                 return;
 356 
 357         case SBIOCSSNAP:
 358                 /*
 359                  * if chunking dont worry about effects of
 360                  * snipping of message size on head flow control
 361                  * since it has a relatively small bearing on the
 362                  * data rate onto the streamn head.
 363                  */
 364                 if (!sbp->sb_chunk) {
 365                         /*
 366                          * set up hi/lo water marks on stream head read queue.
 367                          * unlikely to run out of resources. Fix at later date.
 368                          */
 369                         if ((mop = allocb(sizeof (struct stroptions),
 370                             BPRI_MED)) != NULL) {
 371                                 struct stroptions *sop;
 372                                 uint_t snap;
 373                                 int fudge;
 374 
 375                                 snap = *(uint_t *)mp->b_cont->b_rptr;
 376                                 mop->b_datap->db_type = M_SETOPTS;
 377                                 mop->b_wptr += sizeof (struct stroptions);
 378                                 sop = (struct stroptions *)mop->b_rptr;
 379                                 sop->so_flags = SO_HIWAT | SO_LOWAT;
 380                                 fudge = snap <= 100 ?   4 :
 381                                     snap <= 400 ?   2 :
 382                                     1;
 383                                 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
 384                                 sop->so_lowat = SNIT_LOWAT(snap, fudge);
 385                                 qreply(wq, mop);
 386                         }
 387                 }
 388 
 389                 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
 390                 miocack(wq, mp, 0, 0);
 391                 return;
 392 
 393         default:
 394                 ASSERT(0);
 395                 return;
 396         }
 397 }
 398 
 399 /*
 400  * Write-side put procedure.  Its main task is to detect ioctls
 401  * for manipulating the buffering state and hand them to sbioctl.
 402  * Other message types are passed on through.
 403  */
 404 static void
 405 sbwput(queue_t *wq, mblk_t *mp)
 406 {
 407         struct  sb      *sbp = (struct sb *)wq->q_ptr;
 408         struct copyresp *resp;
 409 
 410         if (sbp->sb_flags & SB_SEND_ON_WRITE)
 411                 sbclosechunk(sbp);
 412         switch (mp->b_datap->db_type) {
 413         case M_IOCTL:
 414                 sbioctl(wq, mp);
 415                 break;
 416 
 417         case M_IOCDATA:
 418                 resp = (struct copyresp *)mp->b_rptr;
 419                 if (resp->cp_rval) {
 420                         /*
 421                          * Just free message on failure.
 422                          */
 423                         freemsg(mp);
 424                         break;
 425                 }
 426 
 427                 switch (resp->cp_cmd) {
 428                 case SBIOCSTIME:
 429                 case SBIOCSCHUNK:
 430                 case SBIOCSFLAGS:
 431                 case SBIOCSSNAP:
 432                 case SBIOCGTIME:
 433                 case SBIOCGCHUNK:
 434                 case SBIOCGSNAP:
 435                 case SBIOCGFLAGS:
 436                         sbioc(wq, mp);
 437                         break;
 438 
 439                 default:
 440                         putnext(wq, mp);
 441                         break;
 442                 }
 443                 break;
 444 
 445         default:
 446                 putnext(wq, mp);
 447                 break;
 448         }
 449 }
 450 
 451 /*
 452  * Read-side put procedure.  It's responsible for buffering up incoming
 453  * messages and grouping them into aggregates according to the current
 454  * buffering parameters.
 455  */
 456 static void
 457 sbrput(queue_t *rq, mblk_t *mp)
 458 {
 459         struct  sb      *sbp = (struct sb *)rq->q_ptr;
 460 
 461         ASSERT(sbp);
 462 
 463         switch (mp->b_datap->db_type) {
 464         case M_PROTO:
 465                 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
 466                         sbclosechunk(sbp);
 467                         sbsendit(rq, mp);
 468                         break;
 469                 } else {
 470                         /*
 471                          * Convert M_PROTO to M_DATA.
 472                          */
 473                         mp->b_datap->db_type = M_DATA;
 474                 }
 475                 /* FALLTHRU */
 476 
 477         case M_DATA:
 478                 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
 479                     !(sbp->sb_state & SB_FRCVD)) {
 480                         sbclosechunk(sbp);
 481                         sbsendit(rq, mp);
 482                         sbp->sb_state |= SB_FRCVD;
 483                 } else
 484                         sbaddmsg(rq, mp);
 485 
 486                 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
 487                         sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
 488                             sbp, sbp->sb_ticks);
 489 
 490                 break;
 491 
 492         case M_FLUSH:
 493                 if (*mp->b_rptr & FLUSHR) {
 494                         /*
 495                          * Reset timeout, flush the chunk currently in
 496                          * progress, and start a new chunk.
 497                          */
 498                         if (sbp->sb_timeoutid) {
 499                                 (void) quntimeout(sbp->sb_rq,
 500                                     sbp->sb_timeoutid);
 501                                 sbp->sb_timeoutid = 0;
 502                         }
 503                         if (sbp->sb_mp) {
 504                                 freemsg(sbp->sb_mp);
 505                                 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
 506                                 sbp->sb_mlen = 0;
 507                                 sbp->sb_mcount = 0;
 508                         }
 509                         flushq(rq, FLUSHALL);
 510                 }
 511                 putnext(rq, mp);
 512                 break;
 513 
 514         case M_CTL:
 515                 /*
 516                  * Zero-length M_CTL means our timeout() popped.
 517                  */
 518                 if (MBLKL(mp) == 0) {
 519                         freemsg(mp);
 520                         sbclosechunk(sbp);
 521                 } else {
 522                         sbclosechunk(sbp);
 523                         sbsendit(rq, mp);
 524                 }
 525                 break;
 526 
 527         default:
 528                 if (mp->b_datap->db_type <= QPCTL) {
 529                         sbclosechunk(sbp);
 530                         sbsendit(rq, mp);
 531                 } else {
 532                         /* Note: out of band */
 533                         putnext(rq, mp);
 534                 }
 535                 break;
 536         }
 537 }
 538 
 539 /*
 540  *  read service procedure.
 541  */
 542 /* ARGSUSED */
 543 static void
 544 sbrsrv(queue_t *rq)
 545 {
 546         mblk_t  *mp;
 547 
 548         /*
 549          * High priority messages shouldn't get here but if
 550          * one does, jam it through to avoid infinite loop.
 551          */
 552         while ((mp = getq(rq)) != NULL) {
 553                 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
 554                         /* should only get here if SB_NO_SROPS */
 555                         (void) putbq(rq, mp);
 556                         return;
 557                 }
 558                 putnext(rq, mp);
 559         }
 560 }
 561 
 562 /*
 563  * Handle write-side M_IOCTL messages.
 564  */
 565 static void
 566 sbioctl(queue_t *wq, mblk_t *mp)
 567 {
 568         struct  sb      *sbp = (struct sb *)wq->q_ptr;
 569         struct iocblk   *iocp = (struct iocblk *)mp->b_rptr;
 570         struct  timeval *t;
 571         clock_t ticks;
 572         mblk_t  *mop;
 573         int     transparent = iocp->ioc_count;
 574         mblk_t  *datamp;
 575         int     error;
 576 
 577         switch (iocp->ioc_cmd) {
 578         case SBIOCSTIME:
 579                 if (iocp->ioc_count == TRANSPARENT) {
 580 #ifdef _SYSCALL32_IMPL
 581                         if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 582                                 mcopyin(mp, NULL, sizeof (struct timeval32),
 583                                     NULL);
 584                         } else
 585 #endif /* _SYSCALL32_IMPL */
 586                         {
 587                                 mcopyin(mp, NULL, sizeof (*t), NULL);
 588                         }
 589                         qreply(wq, mp);
 590                 } else {
 591                         /*
 592                          * Verify argument length.
 593                          */
 594 #ifdef _SYSCALL32_IMPL
 595                         if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 596                                 struct timeval32 *t32;
 597 
 598                                 error = miocpullup(mp,
 599                                     sizeof (struct timeval32));
 600                                 if (error != 0) {
 601                                         miocnak(wq, mp, 0, error);
 602                                         break;
 603                                 }
 604                                 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
 605                                 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
 606                                         miocnak(wq, mp, 0, EINVAL);
 607                                         break;
 608                                 }
 609                                 ticks = TIMEVAL_TO_TICK(t32);
 610                         } else
 611 #endif /* _SYSCALL32_IMPL */
 612                         {
 613                                 error = miocpullup(mp, sizeof (struct timeval));
 614                                 if (error != 0) {
 615                                         miocnak(wq, mp, 0, error);
 616                                         break;
 617                                 }
 618 
 619                                 t = (struct timeval *)mp->b_cont->b_rptr;
 620                                 if (t->tv_sec < 0 || t->tv_usec < 0) {
 621                                         miocnak(wq, mp, 0, EINVAL);
 622                                         break;
 623                                 }
 624                                 ticks = TIMEVAL_TO_TICK(t);
 625                         }
 626                         sbp->sb_ticks = ticks;
 627                         if (ticks == 0)
 628                                 sbp->sb_chunk = 0;
 629                         miocack(wq, mp, 0, 0);
 630                         sbclosechunk(sbp);
 631                 }
 632                 break;
 633 
 634         case SBIOCGTIME: {
 635                 struct timeval *t;
 636 
 637                 /*
 638                  * Verify argument length.
 639                  */
 640                 if (transparent != TRANSPARENT) {
 641 #ifdef _SYSCALL32_IMPL
 642                         if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 643                                 error = miocpullup(mp,
 644                                     sizeof (struct timeval32));
 645                                 if (error != 0) {
 646                                         miocnak(wq, mp, 0, error);
 647                                         break;
 648                                 }
 649                         } else
 650 #endif /* _SYSCALL32_IMPL */
 651                         error = miocpullup(mp, sizeof (struct timeval));
 652                         if (error != 0) {
 653                                 miocnak(wq, mp, 0, error);
 654                                 break;
 655                         }
 656                 }
 657 
 658                 /*
 659                  * If infinite timeout, return range error
 660                  * for the ioctl.
 661                  */
 662                 if (sbp->sb_ticks < 0) {
 663                         miocnak(wq, mp, 0, ERANGE);
 664                         break;
 665                 }
 666 
 667 #ifdef _SYSCALL32_IMPL
 668                 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 669                         struct timeval32 *t32;
 670 
 671                         if (transparent == TRANSPARENT) {
 672                                 datamp = allocb(sizeof (*t32), BPRI_MED);
 673                                 if (datamp == NULL) {
 674                                         miocnak(wq, mp, 0, EAGAIN);
 675                                         break;
 676                                 }
 677                                 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
 678                         }
 679 
 680                         t32 = (struct timeval32 *)mp->b_cont->b_rptr;
 681                         TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
 682 
 683                         if (transparent == TRANSPARENT)
 684                                 qreply(wq, mp);
 685                         else
 686                                 miocack(wq, mp, sizeof (*t32), 0);
 687                 } else
 688 #endif /* _SYSCALL32_IMPL */
 689                 {
 690                         if (transparent == TRANSPARENT) {
 691                                 datamp = allocb(sizeof (*t), BPRI_MED);
 692                                 if (datamp == NULL) {
 693                                         miocnak(wq, mp, 0, EAGAIN);
 694                                         break;
 695                                 }
 696                                 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
 697                         }
 698 
 699                         t = (struct timeval *)mp->b_cont->b_rptr;
 700                         TICK_TO_TIMEVAL(sbp->sb_ticks, t);
 701 
 702                         if (transparent == TRANSPARENT)
 703                                 qreply(wq, mp);
 704                         else
 705                                 miocack(wq, mp, sizeof (*t), 0);
 706                 }
 707                 break;
 708         }
 709 
 710         case SBIOCCTIME:
 711                 sbp->sb_ticks = -1;
 712                 miocack(wq, mp, 0, 0);
 713                 break;
 714 
 715         case SBIOCSCHUNK:
 716                 if (iocp->ioc_count == TRANSPARENT) {
 717                         mcopyin(mp, NULL, sizeof (uint_t), NULL);
 718                         qreply(wq, mp);
 719                 } else {
 720                         /*
 721                          * Verify argument length.
 722                          */
 723                         error = miocpullup(mp, sizeof (uint_t));
 724                         if (error != 0) {
 725                                 miocnak(wq, mp, 0, error);
 726                                 break;
 727                         }
 728 
 729                         /*
 730                          * set up hi/lo water marks on stream head read queue.
 731                          * unlikely to run out of resources. Fix at later date.
 732                          */
 733                         if ((mop = allocb(sizeof (struct stroptions),
 734                             BPRI_MED)) != NULL) {
 735                                 struct stroptions *sop;
 736                                 uint_t chunk;
 737 
 738                                 chunk = *(uint_t *)mp->b_cont->b_rptr;
 739                                 mop->b_datap->db_type = M_SETOPTS;
 740                                 mop->b_wptr += sizeof (struct stroptions);
 741                                 sop = (struct stroptions *)mop->b_rptr;
 742                                 sop->so_flags = SO_HIWAT | SO_LOWAT;
 743                                 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
 744                                 sop->so_lowat = SNIT_LOWAT(chunk, 1);
 745                                 qreply(wq, mop);
 746                         }
 747 
 748                         sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
 749                         miocack(wq, mp, 0, 0);
 750                         sbclosechunk(sbp);
 751                 }
 752                 break;
 753 
 754         case SBIOCGCHUNK:
 755                 /*
 756                  * Verify argument length.
 757                  */
 758                 if (transparent != TRANSPARENT) {
 759                         error = miocpullup(mp, sizeof (uint_t));
 760                         if (error != 0) {
 761                                 miocnak(wq, mp, 0, error);
 762                                 break;
 763                         }
 764                 }
 765 
 766                 if (transparent == TRANSPARENT) {
 767                         datamp = allocb(sizeof (uint_t), BPRI_MED);
 768                         if (datamp == NULL) {
 769                                 miocnak(wq, mp, 0, EAGAIN);
 770                                 break;
 771                         }
 772                         mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
 773                 }
 774 
 775                 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
 776 
 777                 if (transparent == TRANSPARENT)
 778                         qreply(wq, mp);
 779                 else
 780                         miocack(wq, mp, sizeof (uint_t), 0);
 781                 break;
 782 
 783         case SBIOCSSNAP:
 784                 if (iocp->ioc_count == TRANSPARENT) {
 785                         mcopyin(mp, NULL, sizeof (uint_t), NULL);
 786                         qreply(wq, mp);
 787                 } else {
 788                         /*
 789                          * Verify argument length.
 790                          */
 791                         error = miocpullup(mp, sizeof (uint_t));
 792                         if (error != 0) {
 793                                 miocnak(wq, mp, 0, error);
 794                                 break;
 795                         }
 796 
 797                         /*
 798                          * if chunking dont worry about effects of
 799                          * snipping of message size on head flow control
 800                          * since it has a relatively small bearing on the
 801                          * data rate onto the streamn head.
 802                          */
 803                         if (!sbp->sb_chunk) {
 804                                 /*
 805                                  * set up hi/lo water marks on stream
 806                                  * head read queue.  unlikely to run out
 807                                  * of resources. Fix at later date.
 808                                  */
 809                                 if ((mop = allocb(sizeof (struct stroptions),
 810                                     BPRI_MED)) != NULL) {
 811                                         struct stroptions *sop;
 812                                         uint_t snap;
 813                                         int fudge;
 814 
 815                                         snap = *(uint_t *)mp->b_cont->b_rptr;
 816                                         mop->b_datap->db_type = M_SETOPTS;
 817                                         mop->b_wptr += sizeof (*sop);
 818                                         sop = (struct stroptions *)mop->b_rptr;
 819                                         sop->so_flags = SO_HIWAT | SO_LOWAT;
 820                                         fudge = (snap <= 100) ? 4 :
 821                                             (snap <= 400) ? 2 : 1;
 822                                         sop->so_hiwat = SNIT_HIWAT(snap, fudge);
 823                                         sop->so_lowat = SNIT_LOWAT(snap, fudge);
 824                                         qreply(wq, mop);
 825                                 }
 826                         }
 827 
 828                         sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
 829 
 830                         miocack(wq, mp, 0, 0);
 831                 }
 832                 break;
 833 
 834         case SBIOCGSNAP:
 835                 /*
 836                  * Verify argument length
 837                  */
 838                 if (transparent != TRANSPARENT) {
 839                         error = miocpullup(mp, sizeof (uint_t));
 840                         if (error != 0) {
 841                                 miocnak(wq, mp, 0, error);
 842                                 break;
 843                         }
 844                 }
 845 
 846                 if (transparent == TRANSPARENT) {
 847                         datamp = allocb(sizeof (uint_t), BPRI_MED);
 848                         if (datamp == NULL) {
 849                                 miocnak(wq, mp, 0, EAGAIN);
 850                                 break;
 851                         }
 852                         mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
 853                 }
 854 
 855                 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
 856 
 857                 if (transparent == TRANSPARENT)
 858                         qreply(wq, mp);
 859                 else
 860                         miocack(wq, mp, sizeof (uint_t), 0);
 861                 break;
 862 
 863         case SBIOCSFLAGS:
 864                 /*
 865                  * set the flags.
 866                  */
 867                 if (iocp->ioc_count == TRANSPARENT) {
 868                         mcopyin(mp, NULL, sizeof (uint_t), NULL);
 869                         qreply(wq, mp);
 870                 } else {
 871                         error = miocpullup(mp, sizeof (uint_t));
 872                         if (error != 0) {
 873                                 miocnak(wq, mp, 0, error);
 874                                 break;
 875                         }
 876                         sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
 877                         miocack(wq, mp, 0, 0);
 878                 }
 879                 break;
 880 
 881         case SBIOCGFLAGS:
 882                 /*
 883                  * Verify argument length
 884                  */
 885                 if (transparent != TRANSPARENT) {
 886                         error = miocpullup(mp, sizeof (uint_t));
 887                         if (error != 0) {
 888                                 miocnak(wq, mp, 0, error);
 889                                 break;
 890                         }
 891                 }
 892 
 893                 if (transparent == TRANSPARENT) {
 894                         datamp = allocb(sizeof (uint_t), BPRI_MED);
 895                         if (datamp == NULL) {
 896                                 miocnak(wq, mp, 0, EAGAIN);
 897                                 break;
 898                         }
 899                         mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
 900                 }
 901 
 902                 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
 903 
 904                 if (transparent == TRANSPARENT)
 905                         qreply(wq, mp);
 906                 else
 907                         miocack(wq, mp, sizeof (uint_t), 0);
 908                 break;
 909 
 910 
 911         default:
 912                 putnext(wq, mp);
 913                 break;
 914         }
 915 }
 916 
 917 /*
 918  * Given a length l, calculate the amount of extra storage
 919  * required to round it up to the next multiple of the alignment a.
 920  */
 921 #define RoundUpAmt(l, a)        ((l) % (a) ? (a) - ((l) % (a)) : 0)
 922 /*
 923  * Calculate additional amount of space required for alignment.
 924  */
 925 #define Align(l)                RoundUpAmt(l, sizeof (ulong_t))
 926 /*
 927  * Smallest possible message size when headers are enabled.
 928  * This is used to calculate whether a chunk is nearly full.
 929  */
 930 #define SMALLEST_MESSAGE        sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
 931 
 932 /*
 933  * Process a read-side M_DATA message.
 934  *
 935  * If the currently accumulating chunk doesn't have enough room
 936  * for the message, close off the chunk, pass it upward, and start
 937  * a new one.  Then add the message to the current chunk, taking
 938  * account of the possibility that the message's size exceeds the
 939  * chunk size.
 940  *
 941  * If headers are enabled add an sb_hdr header and trailing alignment padding.
 942  *
 943  * To optimise performance the total number of msgbs should be kept
 944  * to a minimum. This is achieved by using any remaining space in message N
 945  * for both its own padding as well as the header of message N+1 if possible.
 946  * If there's insufficient space we allocate one message to hold this 'wrapper'.
 947  * (there's likely to be space beyond message N, since allocb would have
 948  * rounded up the required size to one of the dblk_sizes).
 949  *
 950  */
 951 static void
 952 sbaddmsg(queue_t *rq, mblk_t *mp)
 953 {
 954         struct sb       *sbp;
 955         struct timeval  t;
 956         struct sb_hdr   hp;
 957         mblk_t *wrapper;        /* padding for msg N, header for msg N+1 */
 958         mblk_t *last;           /* last mblk of current message */
 959         size_t wrapperlen;      /* length of header + padding */
 960         size_t origlen;         /* data length before truncation */
 961         size_t pad;             /* bytes required to align header */
 962 
 963         sbp = (struct sb *)rq->q_ptr;
 964 
 965         origlen = msgdsize(mp);
 966 
 967         /*
 968          * Truncate the message.
 969          */
 970         if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
 971                         (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
 972                 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
 973         else
 974                 hp.sbh_totlen = hp.sbh_msglen = origlen;
 975 
 976         if (sbp->sb_flags & SB_NO_HEADER) {
 977 
 978                 /*
 979                  * Would the inclusion of this message overflow the current
 980                  * chunk? If so close the chunk off and start a new one.
 981                  */
 982                 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
 983                         sbclosechunk(sbp);
 984                 /*
 985                  * First message too big for chunk - just send it up.
 986                  * This will always be true when we're not chunking.
 987                  */
 988                 if (hp.sbh_totlen > sbp->sb_chunk) {
 989                         sbsendit(rq, mp);
 990                         return;
 991                 }
 992 
 993                 /*
 994                  * We now know that the msg will fit in the chunk.
 995                  * Link it onto the end of the chunk.
 996                  * Since linkb() walks the entire chain, we keep a pointer to
 997                  * the first mblk of the last msgb added and call linkb on that
 998                  * that last message, rather than performing the
 999                  * O(n) linkb() operation on the whole chain.
1000                  * sb_head isn't needed in this SB_NO_HEADER mode.
1001                  */
1002                 if (sbp->sb_mp)
1003                         linkb(sbp->sb_tail, mp);
1004                 else
1005                         sbp->sb_mp = mp;
1006 
1007                 sbp->sb_tail = mp;
1008                 sbp->sb_mlen += hp.sbh_totlen;
1009                 sbp->sb_mcount++;
1010         } else {
1011                 /* Timestamp must be done immediately */
1012                 uniqtime(&t);
1013                 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1014 
1015                 pad = Align(hp.sbh_totlen);
1016                 hp.sbh_totlen += sizeof (hp);
1017                 hp.sbh_totlen += pad;
1018 
1019                 /*
1020                  * Would the inclusion of this message overflow the current
1021                  * chunk? If so close the chunk off and start a new one.
1022                  */
1023                 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
1024                                 sbclosechunk(sbp);
1025 
1026                 if (sbp->sb_head == NULL) {
1027                         /* Allocate leading header of new chunk */
1028                         sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1029                         if (sbp->sb_head == NULL) {
1030                                 /*
1031                                  * Memory allocation failure.
1032                                  * This will need to be revisited
1033                                  * since using certain flag combinations
1034                                  * can result in messages being dropped
1035                                  * silently.
1036                                  */
1037                                 freemsg(mp);
1038                                 sbp->sb_drops++;
1039                                 return;
1040                         }
1041                         sbp->sb_mp = sbp->sb_head;
1042                 }
1043 
1044                 /*
1045                  * Copy header into message
1046                  */
1047                 hp.sbh_drops = sbp->sb_drops;
1048                 hp.sbh_origlen = origlen;
1049                 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1050                 sbp->sb_head->b_wptr += sizeof (hp);
1051 
1052                 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1053 
1054                 /*
1055                  * Join message to the chunk
1056                  */
1057                 linkb(sbp->sb_head, mp);
1058 
1059                 sbp->sb_mcount++;
1060                 sbp->sb_mlen += hp.sbh_totlen;
1061 
1062                 /*
1063                  * If the first message alone is too big for the chunk close
1064                  * the chunk now.
1065                  * If the next message would immediately cause the chunk to
1066                  * overflow we may as well close the chunk now. The next
1067                  * message is certain to be at least SMALLEST_MESSAGE size.
1068                  */
1069                 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
1070                         sbclosechunk(sbp);
1071                         return;
1072                 }
1073 
1074                 /*
1075                  * Find space for the wrapper. The wrapper consists of:
1076                  *
1077                  * 1) Padding for this message (this is to ensure each header
1078                  * begins on an 8 byte boundary in the userland buffer).
1079                  *
1080                  * 2) Space for the next message's header, in case the next
1081                  * next message will fit in this chunk.
1082                  *
1083                  * It may be possible to append the wrapper to the last mblk
1084                  * of the message, but only if we 'own' the data. If the dblk
1085                  * has been shared through dupmsg() we mustn't alter it.
1086                  */
1087 
1088                 wrapperlen = (sizeof (hp) + pad);
1089 
1090                 /* Is there space for the wrapper beyond the message's data ? */
1091                 for (last = mp; last->b_cont; last = last->b_cont)
1092                         ;
1093 
1094                 if ((wrapperlen <= MBLKTAIL(last)) &&
1095                         (last->b_datap->db_ref == 1)) {
1096                         if (pad > 0) {
1097                                 /*
1098                                  * Pad with zeroes to the next pointer boundary
1099                                  * (we don't want to disclose kernel data to
1100                                  * users), then advance wptr.
1101                                  */
1102                                 (void) memset(last->b_wptr, 0, pad);
1103                                 last->b_wptr += pad;
1104                         }
1105                         /* Remember where to write the header information */
1106                         sbp->sb_head = last;
1107                 } else {
1108                         /* Have to allocate additional space for the wrapper */
1109                         wrapper = allocb(wrapperlen, BPRI_MED);
1110                         if (wrapper == NULL) {
1111                                 sbclosechunk(sbp);
1112                                 return;
1113                         }
1114                         if (pad > 0) {
1115                                 /*
1116                                  * Pad with zeroes (we don't want to disclose
1117                                  * kernel data to users).
1118                                  */
1119                                 (void) memset(wrapper->b_wptr, 0, pad);
1120                                 wrapper->b_wptr += pad;
1121                         }
1122                         /* Link the wrapper msg onto the end of the chunk */
1123                         linkb(mp, wrapper);
1124                         /* Remember to write the next header in this wrapper */
1125                         sbp->sb_head = wrapper;
1126                 }
1127         }
1128 }
1129 
1130 /*
1131  * Called from timeout().
1132  * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1133  * to synchronize with any active module threads (open, close, wput, rput).
1134  */
1135 static void
1136 sbtick(void *arg)
1137 {
1138         struct sb *sbp = arg;
1139         queue_t *rq;
1140 
1141         ASSERT(sbp);
1142 
1143         rq = sbp->sb_rq;
1144         sbp->sb_timeoutid = 0;               /* timeout has fired */
1145 
1146         if (putctl(rq, M_CTL) == 0)     /* failure */
1147                 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1148 }
1149 
1150 /*
1151  * Close off the currently accumulating chunk and pass
1152  * it upward.  Takes care of resetting timers as well.
1153  *
1154  * This routine is called both directly and as a result
1155  * of the chunk timeout expiring.
1156  */
1157 static void
1158 sbclosechunk(struct sb *sbp)
1159 {
1160         mblk_t  *mp;
1161         queue_t *rq;
1162 
1163         ASSERT(sbp);
1164 
1165         if (sbp->sb_timeoutid) {
1166                 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1167                 sbp->sb_timeoutid = 0;
1168         }
1169 
1170         mp = sbp->sb_mp;
1171         rq = sbp->sb_rq;
1172 
1173         /*
1174          * If there's currently a chunk in progress, close it off
1175          * and try to send it up.
1176          */
1177         if (mp) {
1178                 sbsendit(rq, mp);
1179         }
1180 
1181         /*
1182          * Clear old chunk.  Ready for new msgs.
1183          */
1184         sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1185         sbp->sb_mlen = 0;
1186         sbp->sb_mcount = 0;
1187         if (sbp->sb_flags & SB_DEFER_CHUNK)
1188                 sbp->sb_state &= ~SB_FRCVD;
1189 
1190 }
1191 
1192 static void
1193 sbsendit(queue_t *rq, mblk_t *mp)
1194 {
1195         struct  sb      *sbp = (struct sb *)rq->q_ptr;
1196 
1197         if (!canputnext(rq)) {
1198                 if (sbp->sb_flags & SB_NO_DROPS)
1199                         (void) putq(rq, mp);
1200                 else {
1201                         freemsg(mp);
1202                         sbp->sb_drops += sbp->sb_mcount;
1203                 }
1204                 return;
1205         }
1206         /*
1207          * If there are messages on the q already, keep
1208          * queueing them since they need to be processed in order.
1209          */
1210         if (qsize(rq) > 0) {
1211                 /* should only get here if SB_NO_DROPS */
1212                 (void) putq(rq, mp);
1213         }
1214         else
1215                 putnext(rq, mp);
1216 }