1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License, Version 1.0 only
   6  * (the "License").  You may not use this file except in compliance
   7  * with the License.
   8  *
   9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
  10  * or http://www.opensolaris.org/os/licensing.
  11  * See the License for the specific language governing permissions
  12  * and limitations under the License.
  13  *
  14  * When distributing Covered Code, include this CDDL HEADER in each
  15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  16  * If applicable, add the following below this CDDL HEADER, with the
  17  * fields enclosed by brackets "[]" replaced with your own identifying
  18  * information: Portions Copyright [yyyy] [name of copyright owner]
  19  *
  20  * CDDL HEADER END
  21  */
  22 /*
  23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
  24  * Use is subject to license terms.
  25  */
  26 
  27 #pragma ident   "%Z%%M% %I%     %E% SMI"
  28 
  29 /*
  30  * STREAMS Buffering module
  31  *
  32  * This streams module collects incoming messages from modules below
  33  * it on the stream and buffers them up into a smaller number of
  34  * aggregated messages.  Its main purpose is to reduce overhead by
  35  * cutting down on the number of read (or getmsg) calls its client
  36  * user process makes.
  37  *  - only M_DATA is buffered.
  38  *  - multithreading assumes configured as D_MTQPAIR
  39  *  - packets are lost only if flag SB_NO_HEADER is clear and buffer
  40  *    allocation fails.
  41  *  - in order message transmission. This is enforced for messages other
  42  *    than high priority messages.
  43  *  - zero length messages on the read side are not passed up the
  44  *    stream but used internally for synchronization.
  45  * FLAGS:
  46  * - SB_NO_PROTO_CVT - no conversion of M_PROTO messages to M_DATA.
  47  *   (conversion is the default for backwards compatibility
  48  *    hence the negative logic).
  49  * - SB_NO_HEADER - no headers in buffered data.
  50  *   (adding headers is the default for backwards compatibility
  51  *    hence the negative logic).
  52  * - SB_DEFER_CHUNK - provides improved response time in question-answer
  53  *   applications. Buffering is not enabled until the second message
  54  *   is received on the read side within the sb_ticks interval.
  55  *   This option will often be used in combination with flag SB_SEND_ON_WRITE.
  56  * - SB_SEND_ON_WRITE - a write message results in any pending buffered read
  57  *   data being immediately sent upstream.
  58  * - SB_NO_DROPS - bufmod behaves transparently in flow control and propagates
  59  *   the blocked flow condition downstream. If this flag is clear (default)
  60  *   messages will be dropped if the upstream flow is blocked.
  61  */
  62 
  63 
  64 #include        <sys/types.h>
  65 #include        <sys/errno.h>
  66 #include        <sys/debug.h>
  67 #include        <sys/stropts.h>
  68 #include        <sys/time.h>
  69 #include        <sys/stream.h>
  70 #include        <sys/conf.h>
  71 #include        <sys/ddi.h>
  72 #include        <sys/sunddi.h>
  73 #include        <sys/kmem.h>
  74 #include        <sys/strsun.h>
  75 #include        <sys/bufmod.h>
  76 #include        <sys/modctl.h>
  77 #include        <sys/isa_defs.h>
  78 
  79 /*
  80  * Per-Stream state information.
  81  *
  82  * If sb_ticks is negative, we don't deliver chunks until they're
  83  * full.  If it's zero, we deliver every packet as it arrives.  (In
  84  * this case we force sb_chunk to zero, to make the implementation
  85  * easier.)  Otherwise, sb_ticks gives the number of ticks in a
  86  * buffering interval. The interval begins when the a read side data
  87  * message is received and a timeout is not active. If sb_snap is
  88  * zero, no truncation of the msg is done.
  89  */
  90 struct sb {
  91         queue_t *sb_rq;         /* our rq */
  92         mblk_t  *sb_mp;         /* partial chunk */
  93         mblk_t  *sb_head;       /* pre-allocated space for the next header */
  94         mblk_t  *sb_tail;       /* first mblk of last message appended */
  95         uint_t  sb_mlen;        /* sb_mp length */
  96         uint_t  sb_mcount;      /* input msg count in sb_mp */
  97         uint_t  sb_chunk;       /* max chunk size */
  98         clock_t sb_ticks;       /* timeout interval */
  99         timeout_id_t sb_timeoutid; /* qtimeout() id */
 100         uint_t  sb_drops;       /* cumulative # discarded msgs */
 101         uint_t  sb_snap;        /* snapshot length */
 102         uint_t  sb_flags;       /* flags field */
 103         uint_t  sb_state;       /* state variable */
 104 };
 105 
 106 /*
 107  * Function prototypes.
 108  */
 109 static  int     sbopen(queue_t *, dev_t *, int, int, cred_t *);
 110 static  int     sbclose(queue_t *, int, cred_t *);
 111 static  void    sbwput(queue_t *, mblk_t *);
 112 static  void    sbrput(queue_t *, mblk_t *);
 113 static  void    sbrsrv(queue_t *);
 114 static  void    sbioctl(queue_t *, mblk_t *);
 115 static  void    sbaddmsg(queue_t *, mblk_t *);
 116 static  void    sbtick(void *);
 117 static  void    sbclosechunk(struct sb *);
 118 static  void    sbsendit(queue_t *, mblk_t *);
 119 
 120 static struct module_info       sb_minfo = {
 121         21,             /* mi_idnum */
 122         "bufmod",       /* mi_idname */
 123         0,              /* mi_minpsz */
 124         INFPSZ,         /* mi_maxpsz */
 125         1,              /* mi_hiwat */
 126         0               /* mi_lowat */
 127 };
 128 
 129 static struct qinit     sb_rinit = {
 130         (int (*)())sbrput,      /* qi_putp */
 131         (int (*)())sbrsrv,      /* qi_srvp */
 132         sbopen,                 /* qi_qopen */
 133         sbclose,                /* qi_qclose */
 134         NULL,                   /* qi_qadmin */
 135         &sb_minfo,          /* qi_minfo */
 136         NULL                    /* qi_mstat */
 137 };
 138 
 139 static struct qinit     sb_winit = {
 140         (int (*)())sbwput,      /* qi_putp */
 141         NULL,                   /* qi_srvp */
 142         NULL,                   /* qi_qopen */
 143         NULL,                   /* qi_qclose */
 144         NULL,                   /* qi_qadmin */
 145         &sb_minfo,          /* qi_minfo */
 146         NULL                    /* qi_mstat */
 147 };
 148 
 149 static struct streamtab sb_info = {
 150         &sb_rinit,  /* st_rdinit */
 151         &sb_winit,  /* st_wrinit */
 152         NULL,           /* st_muxrinit */
 153         NULL            /* st_muxwinit */
 154 };
 155 
 156 
 157 /*
 158  * This is the loadable module wrapper.
 159  */
 160 
 161 static struct fmodsw fsw = {
 162         "bufmod",
 163         &sb_info,
 164         D_MTQPAIR | D_MP
 165 };
 166 
 167 /*
 168  * Module linkage information for the kernel.
 169  */
 170 
 171 static struct modlstrmod modlstrmod = {
 172         &mod_strmodops, "streams buffer mod", &fsw
 173 };
 174 
 175 static struct modlinkage modlinkage = {
 176         MODREV_1, &modlstrmod, NULL
 177 };
 178 
 179 
 180 int
 181 _init(void)
 182 {
 183         return (mod_install(&modlinkage));
 184 }
 185 
 186 int
 187 _fini(void)
 188 {
 189         return (mod_remove(&modlinkage));
 190 }
 191 
 192 int
 193 _info(struct modinfo *modinfop)
 194 {
 195         return (mod_info(&modlinkage, modinfop));
 196 }
 197 
 198 
 199 /* ARGSUSED */
 200 static int
 201 sbopen(queue_t *rq, dev_t *dev, int oflag, int sflag, cred_t *crp)
 202 {
 203         struct sb       *sbp;
 204         ASSERT(rq);
 205 
 206         if (sflag != MODOPEN)
 207                 return (EINVAL);
 208 
 209         if (rq->q_ptr)
 210                 return (0);
 211 
 212         /*
 213          * Allocate and initialize per-Stream structure.
 214          */
 215         sbp = kmem_alloc(sizeof (struct sb), KM_SLEEP);
 216         sbp->sb_rq = rq;
 217         sbp->sb_ticks = -1;
 218         sbp->sb_chunk = SB_DFLT_CHUNK;
 219         sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
 220         sbp->sb_mlen = 0;
 221         sbp->sb_mcount = 0;
 222         sbp->sb_timeoutid = 0;
 223         sbp->sb_drops = 0;
 224         sbp->sb_snap = 0;
 225         sbp->sb_flags = 0;
 226         sbp->sb_state = 0;
 227 
 228         rq->q_ptr = WR(rq)->q_ptr = sbp;
 229 
 230         qprocson(rq);
 231 
 232 
 233         return (0);
 234 }
 235 
 236 /* ARGSUSED1 */
 237 static int
 238 sbclose(queue_t *rq, int flag, cred_t *credp)
 239 {
 240         struct  sb      *sbp = (struct sb *)rq->q_ptr;
 241 
 242         ASSERT(sbp);
 243 
 244         qprocsoff(rq);
 245         /*
 246          * Cancel an outstanding timeout
 247          */
 248         if (sbp->sb_timeoutid != 0) {
 249                 (void) quntimeout(rq, sbp->sb_timeoutid);
 250                 sbp->sb_timeoutid = 0;
 251         }
 252         /*
 253          * Free the current chunk.
 254          */
 255         if (sbp->sb_mp) {
 256                 freemsg(sbp->sb_mp);
 257                 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
 258                 sbp->sb_mlen = 0;
 259         }
 260 
 261         /*
 262          * Free the per-Stream structure.
 263          */
 264         kmem_free((caddr_t)sbp, sizeof (struct sb));
 265         rq->q_ptr = WR(rq)->q_ptr = NULL;
 266 
 267         return (0);
 268 }
 269 
 270 /*
 271  * the correction factor is introduced to compensate for
 272  * whatever assumptions the modules below have made about
 273  * how much traffic is flowing through the stream and the fact
 274  * that bufmod may be snipping messages with the sb_snap length.
 275  */
 276 #define SNIT_HIWAT(msgsize, fudge)      ((4 * msgsize * fudge) + 512)
 277 #define SNIT_LOWAT(msgsize, fudge)      ((2 * msgsize * fudge) + 256)
 278 
 279 
 280 static void
 281 sbioc(queue_t *wq, mblk_t *mp)
 282 {
 283         struct iocblk *iocp;
 284         struct sb *sbp = (struct sb *)wq->q_ptr;
 285         clock_t ticks;
 286         mblk_t  *mop;
 287 
 288         iocp = (struct iocblk *)mp->b_rptr;
 289 
 290         switch (iocp->ioc_cmd) {
 291         case SBIOCGCHUNK:
 292         case SBIOCGSNAP:
 293         case SBIOCGFLAGS:
 294         case SBIOCGTIME:
 295                 miocack(wq, mp, 0, 0);
 296                 return;
 297 
 298         case SBIOCSTIME:
 299 #ifdef _SYSCALL32_IMPL
 300                 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 301                         struct timeval32 *t32;
 302 
 303                         t32 = (struct timeval32 *)mp->b_cont->b_rptr;
 304                         if (t32->tv_sec < 0 || t32->tv_usec < 0) {
 305                                 miocnak(wq, mp, 0, EINVAL);
 306                                 break;
 307                         }
 308                         ticks = TIMEVAL_TO_TICK(t32);
 309                 } else
 310 #endif /* _SYSCALL32_IMPL */
 311                 {
 312                         struct timeval *tb;
 313 
 314                         tb = (struct timeval *)mp->b_cont->b_rptr;
 315 
 316                         if (tb->tv_sec < 0 || tb->tv_usec < 0) {
 317                                 miocnak(wq, mp, 0, EINVAL);
 318                                 break;
 319                         }
 320                         ticks = TIMEVAL_TO_TICK(tb);
 321                 }
 322                 sbp->sb_ticks = ticks;
 323                 if (ticks == 0)
 324                         sbp->sb_chunk = 0;
 325                 miocack(wq, mp, 0, 0);
 326                 sbclosechunk(sbp);
 327                 return;
 328 
 329         case SBIOCSCHUNK:
 330                 /*
 331                  * set up hi/lo water marks on stream head read queue.
 332                  * unlikely to run out of resources. Fix at later date.
 333                  */
 334                 if ((mop = allocb(sizeof (struct stroptions),
 335                     BPRI_MED)) != NULL) {
 336                         struct stroptions *sop;
 337                         uint_t chunk;
 338 
 339                         chunk = *(uint_t *)mp->b_cont->b_rptr;
 340                         mop->b_datap->db_type = M_SETOPTS;
 341                         mop->b_wptr += sizeof (struct stroptions);
 342                         sop = (struct stroptions *)mop->b_rptr;
 343                         sop->so_flags = SO_HIWAT | SO_LOWAT;
 344                         sop->so_hiwat = SNIT_HIWAT(chunk, 1);
 345                         sop->so_lowat = SNIT_LOWAT(chunk, 1);
 346                         qreply(wq, mop);
 347                 }
 348 
 349                 sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
 350                 miocack(wq, mp, 0, 0);
 351                 sbclosechunk(sbp);
 352                 return;
 353 
 354         case SBIOCSFLAGS:
 355                 sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
 356                 miocack(wq, mp, 0, 0);
 357                 return;
 358 
 359         case SBIOCSSNAP:
 360                 /*
 361                  * if chunking dont worry about effects of
 362                  * snipping of message size on head flow control
 363                  * since it has a relatively small bearing on the
 364                  * data rate onto the streamn head.
 365                  */
 366                 if (!sbp->sb_chunk) {
 367                         /*
 368                          * set up hi/lo water marks on stream head read queue.
 369                          * unlikely to run out of resources. Fix at later date.
 370                          */
 371                         if ((mop = allocb(sizeof (struct stroptions),
 372                             BPRI_MED)) != NULL) {
 373                                 struct stroptions *sop;
 374                                 uint_t snap;
 375                                 int fudge;
 376 
 377                                 snap = *(uint_t *)mp->b_cont->b_rptr;
 378                                 mop->b_datap->db_type = M_SETOPTS;
 379                                 mop->b_wptr += sizeof (struct stroptions);
 380                                 sop = (struct stroptions *)mop->b_rptr;
 381                                 sop->so_flags = SO_HIWAT | SO_LOWAT;
 382                                 fudge = snap <= 100 ?   4 :
 383                                     snap <= 400 ?   2 :
 384                                     1;
 385                                 sop->so_hiwat = SNIT_HIWAT(snap, fudge);
 386                                 sop->so_lowat = SNIT_LOWAT(snap, fudge);
 387                                 qreply(wq, mop);
 388                         }
 389                 }
 390 
 391                 sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
 392                 miocack(wq, mp, 0, 0);
 393                 return;
 394 
 395         default:
 396                 ASSERT(0);
 397                 return;
 398         }
 399 }
 400 
 401 /*
 402  * Write-side put procedure.  Its main task is to detect ioctls
 403  * for manipulating the buffering state and hand them to sbioctl.
 404  * Other message types are passed on through.
 405  */
 406 static void
 407 sbwput(queue_t *wq, mblk_t *mp)
 408 {
 409         struct  sb      *sbp = (struct sb *)wq->q_ptr;
 410         struct copyresp *resp;
 411 
 412         if (sbp->sb_flags & SB_SEND_ON_WRITE)
 413                 sbclosechunk(sbp);
 414         switch (mp->b_datap->db_type) {
 415         case M_IOCTL:
 416                 sbioctl(wq, mp);
 417                 break;
 418 
 419         case M_IOCDATA:
 420                 resp = (struct copyresp *)mp->b_rptr;
 421                 if (resp->cp_rval) {
 422                         /*
 423                          * Just free message on failure.
 424                          */
 425                         freemsg(mp);
 426                         break;
 427                 }
 428 
 429                 switch (resp->cp_cmd) {
 430                 case SBIOCSTIME:
 431                 case SBIOCSCHUNK:
 432                 case SBIOCSFLAGS:
 433                 case SBIOCSSNAP:
 434                 case SBIOCGTIME:
 435                 case SBIOCGCHUNK:
 436                 case SBIOCGSNAP:
 437                 case SBIOCGFLAGS:
 438                         sbioc(wq, mp);
 439                         break;
 440 
 441                 default:
 442                         putnext(wq, mp);
 443                         break;
 444                 }
 445                 break;
 446 
 447         default:
 448                 putnext(wq, mp);
 449                 break;
 450         }
 451 }
 452 
 453 /*
 454  * Read-side put procedure.  It's responsible for buffering up incoming
 455  * messages and grouping them into aggregates according to the current
 456  * buffering parameters.
 457  */
 458 static void
 459 sbrput(queue_t *rq, mblk_t *mp)
 460 {
 461         struct  sb      *sbp = (struct sb *)rq->q_ptr;
 462 
 463         ASSERT(sbp);
 464 
 465         switch (mp->b_datap->db_type) {
 466         case M_PROTO:
 467                 if (sbp->sb_flags & SB_NO_PROTO_CVT) {
 468                         sbclosechunk(sbp);
 469                         sbsendit(rq, mp);
 470                         break;
 471                 } else {
 472                         /*
 473                          * Convert M_PROTO to M_DATA.
 474                          */
 475                         mp->b_datap->db_type = M_DATA;
 476                 }
 477                 /* FALLTHRU */
 478 
 479         case M_DATA:
 480                 if ((sbp->sb_flags & SB_DEFER_CHUNK) &&
 481                     !(sbp->sb_state & SB_FRCVD)) {
 482                         sbclosechunk(sbp);
 483                         sbsendit(rq, mp);
 484                         sbp->sb_state |= SB_FRCVD;
 485                 } else
 486                         sbaddmsg(rq, mp);
 487 
 488                 if ((sbp->sb_ticks > 0) && !(sbp->sb_timeoutid))
 489                         sbp->sb_timeoutid = qtimeout(sbp->sb_rq, sbtick,
 490                             sbp, sbp->sb_ticks);
 491 
 492                 break;
 493 
 494         case M_FLUSH:
 495                 if (*mp->b_rptr & FLUSHR) {
 496                         /*
 497                          * Reset timeout, flush the chunk currently in
 498                          * progress, and start a new chunk.
 499                          */
 500                         if (sbp->sb_timeoutid) {
 501                                 (void) quntimeout(sbp->sb_rq,
 502                                     sbp->sb_timeoutid);
 503                                 sbp->sb_timeoutid = 0;
 504                         }
 505                         if (sbp->sb_mp) {
 506                                 freemsg(sbp->sb_mp);
 507                                 sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
 508                                 sbp->sb_mlen = 0;
 509                                 sbp->sb_mcount = 0;
 510                         }
 511                         flushq(rq, FLUSHALL);
 512                 }
 513                 putnext(rq, mp);
 514                 break;
 515 
 516         case M_CTL:
 517                 /*
 518                  * Zero-length M_CTL means our timeout() popped.
 519                  */
 520                 if (MBLKL(mp) == 0) {
 521                         freemsg(mp);
 522                         sbclosechunk(sbp);
 523                 } else {
 524                         sbclosechunk(sbp);
 525                         sbsendit(rq, mp);
 526                 }
 527                 break;
 528 
 529         default:
 530                 if (mp->b_datap->db_type <= QPCTL) {
 531                         sbclosechunk(sbp);
 532                         sbsendit(rq, mp);
 533                 } else {
 534                         /* Note: out of band */
 535                         putnext(rq, mp);
 536                 }
 537                 break;
 538         }
 539 }
 540 
 541 /*
 542  *  read service procedure.
 543  */
 544 /* ARGSUSED */
 545 static void
 546 sbrsrv(queue_t *rq)
 547 {
 548         mblk_t  *mp;
 549 
 550         /*
 551          * High priority messages shouldn't get here but if
 552          * one does, jam it through to avoid infinite loop.
 553          */
 554         while ((mp = getq(rq)) != NULL) {
 555                 if (!canputnext(rq) && (mp->b_datap->db_type <= QPCTL)) {
 556                         /* should only get here if SB_NO_SROPS */
 557                         (void) putbq(rq, mp);
 558                         return;
 559                 }
 560                 putnext(rq, mp);
 561         }
 562 }
 563 
 564 /*
 565  * Handle write-side M_IOCTL messages.
 566  */
 567 static void
 568 sbioctl(queue_t *wq, mblk_t *mp)
 569 {
 570         struct  sb      *sbp = (struct sb *)wq->q_ptr;
 571         struct iocblk   *iocp = (struct iocblk *)mp->b_rptr;
 572         struct  timeval *t;
 573         clock_t ticks;
 574         mblk_t  *mop;
 575         int     transparent = iocp->ioc_count;
 576         mblk_t  *datamp;
 577         int     error;
 578 
 579         switch (iocp->ioc_cmd) {
 580         case SBIOCSTIME:
 581                 if (iocp->ioc_count == TRANSPARENT) {
 582 #ifdef _SYSCALL32_IMPL
 583                         if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 584                                 mcopyin(mp, NULL, sizeof (struct timeval32),
 585                                     NULL);
 586                         } else
 587 #endif /* _SYSCALL32_IMPL */
 588                         {
 589                                 mcopyin(mp, NULL, sizeof (*t), NULL);
 590                         }
 591                         qreply(wq, mp);
 592                 } else {
 593                         /*
 594                          * Verify argument length.
 595                          */
 596 #ifdef _SYSCALL32_IMPL
 597                         if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 598                                 struct timeval32 *t32;
 599 
 600                                 error = miocpullup(mp,
 601                                     sizeof (struct timeval32));
 602                                 if (error != 0) {
 603                                         miocnak(wq, mp, 0, error);
 604                                         break;
 605                                 }
 606                                 t32 = (struct timeval32 *)mp->b_cont->b_rptr;
 607                                 if (t32->tv_sec < 0 || t32->tv_usec < 0) {
 608                                         miocnak(wq, mp, 0, EINVAL);
 609                                         break;
 610                                 }
 611                                 ticks = TIMEVAL_TO_TICK(t32);
 612                         } else
 613 #endif /* _SYSCALL32_IMPL */
 614                         {
 615                                 error = miocpullup(mp, sizeof (struct timeval));
 616                                 if (error != 0) {
 617                                         miocnak(wq, mp, 0, error);
 618                                         break;
 619                                 }
 620 
 621                                 t = (struct timeval *)mp->b_cont->b_rptr;
 622                                 if (t->tv_sec < 0 || t->tv_usec < 0) {
 623                                         miocnak(wq, mp, 0, EINVAL);
 624                                         break;
 625                                 }
 626                                 ticks = TIMEVAL_TO_TICK(t);
 627                         }
 628                         sbp->sb_ticks = ticks;
 629                         if (ticks == 0)
 630                                 sbp->sb_chunk = 0;
 631                         miocack(wq, mp, 0, 0);
 632                         sbclosechunk(sbp);
 633                 }
 634                 break;
 635 
 636         case SBIOCGTIME: {
 637                 struct timeval *t;
 638 
 639                 /*
 640                  * Verify argument length.
 641                  */
 642                 if (transparent != TRANSPARENT) {
 643 #ifdef _SYSCALL32_IMPL
 644                         if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 645                                 error = miocpullup(mp,
 646                                     sizeof (struct timeval32));
 647                                 if (error != 0) {
 648                                         miocnak(wq, mp, 0, error);
 649                                         break;
 650                                 }
 651                         } else
 652 #endif /* _SYSCALL32_IMPL */
 653                         error = miocpullup(mp, sizeof (struct timeval));
 654                         if (error != 0) {
 655                                 miocnak(wq, mp, 0, error);
 656                                 break;
 657                         }
 658                 }
 659 
 660                 /*
 661                  * If infinite timeout, return range error
 662                  * for the ioctl.
 663                  */
 664                 if (sbp->sb_ticks < 0) {
 665                         miocnak(wq, mp, 0, ERANGE);
 666                         break;
 667                 }
 668 
 669 #ifdef _SYSCALL32_IMPL
 670                 if ((iocp->ioc_flag & IOC_MODELS) != IOC_NATIVE) {
 671                         struct timeval32 *t32;
 672 
 673                         if (transparent == TRANSPARENT) {
 674                                 datamp = allocb(sizeof (*t32), BPRI_MED);
 675                                 if (datamp == NULL) {
 676                                         miocnak(wq, mp, 0, EAGAIN);
 677                                         break;
 678                                 }
 679                                 mcopyout(mp, NULL, sizeof (*t32), NULL, datamp);
 680                         }
 681 
 682                         t32 = (struct timeval32 *)mp->b_cont->b_rptr;
 683                         TICK_TO_TIMEVAL32(sbp->sb_ticks, t32);
 684 
 685                         if (transparent == TRANSPARENT)
 686                                 qreply(wq, mp);
 687                         else
 688                                 miocack(wq, mp, sizeof (*t32), 0);
 689                 } else
 690 #endif /* _SYSCALL32_IMPL */
 691                 {
 692                         if (transparent == TRANSPARENT) {
 693                                 datamp = allocb(sizeof (*t), BPRI_MED);
 694                                 if (datamp == NULL) {
 695                                         miocnak(wq, mp, 0, EAGAIN);
 696                                         break;
 697                                 }
 698                                 mcopyout(mp, NULL, sizeof (*t), NULL, datamp);
 699                         }
 700 
 701                         t = (struct timeval *)mp->b_cont->b_rptr;
 702                         TICK_TO_TIMEVAL(sbp->sb_ticks, t);
 703 
 704                         if (transparent == TRANSPARENT)
 705                                 qreply(wq, mp);
 706                         else
 707                                 miocack(wq, mp, sizeof (*t), 0);
 708                 }
 709                 break;
 710         }
 711 
 712         case SBIOCCTIME:
 713                 sbp->sb_ticks = -1;
 714                 miocack(wq, mp, 0, 0);
 715                 break;
 716 
 717         case SBIOCSCHUNK:
 718                 if (iocp->ioc_count == TRANSPARENT) {
 719                         mcopyin(mp, NULL, sizeof (uint_t), NULL);
 720                         qreply(wq, mp);
 721                 } else {
 722                         /*
 723                          * Verify argument length.
 724                          */
 725                         error = miocpullup(mp, sizeof (uint_t));
 726                         if (error != 0) {
 727                                 miocnak(wq, mp, 0, error);
 728                                 break;
 729                         }
 730 
 731                         /*
 732                          * set up hi/lo water marks on stream head read queue.
 733                          * unlikely to run out of resources. Fix at later date.
 734                          */
 735                         if ((mop = allocb(sizeof (struct stroptions),
 736                             BPRI_MED)) != NULL) {
 737                                 struct stroptions *sop;
 738                                 uint_t chunk;
 739 
 740                                 chunk = *(uint_t *)mp->b_cont->b_rptr;
 741                                 mop->b_datap->db_type = M_SETOPTS;
 742                                 mop->b_wptr += sizeof (struct stroptions);
 743                                 sop = (struct stroptions *)mop->b_rptr;
 744                                 sop->so_flags = SO_HIWAT | SO_LOWAT;
 745                                 sop->so_hiwat = SNIT_HIWAT(chunk, 1);
 746                                 sop->so_lowat = SNIT_LOWAT(chunk, 1);
 747                                 qreply(wq, mop);
 748                         }
 749 
 750                         sbp->sb_chunk = *(uint_t *)mp->b_cont->b_rptr;
 751                         miocack(wq, mp, 0, 0);
 752                         sbclosechunk(sbp);
 753                 }
 754                 break;
 755 
 756         case SBIOCGCHUNK:
 757                 /*
 758                  * Verify argument length.
 759                  */
 760                 if (transparent != TRANSPARENT) {
 761                         error = miocpullup(mp, sizeof (uint_t));
 762                         if (error != 0) {
 763                                 miocnak(wq, mp, 0, error);
 764                                 break;
 765                         }
 766                 }
 767 
 768                 if (transparent == TRANSPARENT) {
 769                         datamp = allocb(sizeof (uint_t), BPRI_MED);
 770                         if (datamp == NULL) {
 771                                 miocnak(wq, mp, 0, EAGAIN);
 772                                 break;
 773                         }
 774                         mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
 775                 }
 776 
 777                 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_chunk;
 778 
 779                 if (transparent == TRANSPARENT)
 780                         qreply(wq, mp);
 781                 else
 782                         miocack(wq, mp, sizeof (uint_t), 0);
 783                 break;
 784 
 785         case SBIOCSSNAP:
 786                 if (iocp->ioc_count == TRANSPARENT) {
 787                         mcopyin(mp, NULL, sizeof (uint_t), NULL);
 788                         qreply(wq, mp);
 789                 } else {
 790                         /*
 791                          * Verify argument length.
 792                          */
 793                         error = miocpullup(mp, sizeof (uint_t));
 794                         if (error != 0) {
 795                                 miocnak(wq, mp, 0, error);
 796                                 break;
 797                         }
 798 
 799                         /*
 800                          * if chunking dont worry about effects of
 801                          * snipping of message size on head flow control
 802                          * since it has a relatively small bearing on the
 803                          * data rate onto the streamn head.
 804                          */
 805                         if (!sbp->sb_chunk) {
 806                                 /*
 807                                  * set up hi/lo water marks on stream
 808                                  * head read queue.  unlikely to run out
 809                                  * of resources. Fix at later date.
 810                                  */
 811                                 if ((mop = allocb(sizeof (struct stroptions),
 812                                     BPRI_MED)) != NULL) {
 813                                         struct stroptions *sop;
 814                                         uint_t snap;
 815                                         int fudge;
 816 
 817                                         snap = *(uint_t *)mp->b_cont->b_rptr;
 818                                         mop->b_datap->db_type = M_SETOPTS;
 819                                         mop->b_wptr += sizeof (*sop);
 820                                         sop = (struct stroptions *)mop->b_rptr;
 821                                         sop->so_flags = SO_HIWAT | SO_LOWAT;
 822                                         fudge = (snap <= 100) ? 4 :
 823                                             (snap <= 400) ? 2 : 1;
 824                                         sop->so_hiwat = SNIT_HIWAT(snap, fudge);
 825                                         sop->so_lowat = SNIT_LOWAT(snap, fudge);
 826                                         qreply(wq, mop);
 827                                 }
 828                         }
 829 
 830                         sbp->sb_snap = *(uint_t *)mp->b_cont->b_rptr;
 831 
 832                         miocack(wq, mp, 0, 0);
 833                 }
 834                 break;
 835 
 836         case SBIOCGSNAP:
 837                 /*
 838                  * Verify argument length
 839                  */
 840                 if (transparent != TRANSPARENT) {
 841                         error = miocpullup(mp, sizeof (uint_t));
 842                         if (error != 0) {
 843                                 miocnak(wq, mp, 0, error);
 844                                 break;
 845                         }
 846                 }
 847 
 848                 if (transparent == TRANSPARENT) {
 849                         datamp = allocb(sizeof (uint_t), BPRI_MED);
 850                         if (datamp == NULL) {
 851                                 miocnak(wq, mp, 0, EAGAIN);
 852                                 break;
 853                         }
 854                         mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
 855                 }
 856 
 857                 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_snap;
 858 
 859                 if (transparent == TRANSPARENT)
 860                         qreply(wq, mp);
 861                 else
 862                         miocack(wq, mp, sizeof (uint_t), 0);
 863                 break;
 864 
 865         case SBIOCSFLAGS:
 866                 /*
 867                  * set the flags.
 868                  */
 869                 if (iocp->ioc_count == TRANSPARENT) {
 870                         mcopyin(mp, NULL, sizeof (uint_t), NULL);
 871                         qreply(wq, mp);
 872                 } else {
 873                         error = miocpullup(mp, sizeof (uint_t));
 874                         if (error != 0) {
 875                                 miocnak(wq, mp, 0, error);
 876                                 break;
 877                         }
 878                         sbp->sb_flags = *(uint_t *)mp->b_cont->b_rptr;
 879                         miocack(wq, mp, 0, 0);
 880                 }
 881                 break;
 882 
 883         case SBIOCGFLAGS:
 884                 /*
 885                  * Verify argument length
 886                  */
 887                 if (transparent != TRANSPARENT) {
 888                         error = miocpullup(mp, sizeof (uint_t));
 889                         if (error != 0) {
 890                                 miocnak(wq, mp, 0, error);
 891                                 break;
 892                         }
 893                 }
 894 
 895                 if (transparent == TRANSPARENT) {
 896                         datamp = allocb(sizeof (uint_t), BPRI_MED);
 897                         if (datamp == NULL) {
 898                                 miocnak(wq, mp, 0, EAGAIN);
 899                                 break;
 900                         }
 901                         mcopyout(mp, NULL, sizeof (uint_t), NULL, datamp);
 902                 }
 903 
 904                 *(uint_t *)mp->b_cont->b_rptr = sbp->sb_flags;
 905 
 906                 if (transparent == TRANSPARENT)
 907                         qreply(wq, mp);
 908                 else
 909                         miocack(wq, mp, sizeof (uint_t), 0);
 910                 break;
 911 
 912 
 913         default:
 914                 putnext(wq, mp);
 915                 break;
 916         }
 917 }
 918 
 919 /*
 920  * Given a length l, calculate the amount of extra storage
 921  * required to round it up to the next multiple of the alignment a.
 922  */
 923 #define RoundUpAmt(l, a)        ((l) % (a) ? (a) - ((l) % (a)) : 0)
 924 /*
 925  * Calculate additional amount of space required for alignment.
 926  */
 927 #define Align(l)                RoundUpAmt(l, sizeof (ulong_t))
 928 /*
 929  * Smallest possible message size when headers are enabled.
 930  * This is used to calculate whether a chunk is nearly full.
 931  */
 932 #define SMALLEST_MESSAGE        sizeof (struct sb_hdr) + _POINTER_ALIGNMENT
 933 
 934 /*
 935  * Process a read-side M_DATA message.
 936  *
 937  * If the currently accumulating chunk doesn't have enough room
 938  * for the message, close off the chunk, pass it upward, and start
 939  * a new one.  Then add the message to the current chunk, taking
 940  * account of the possibility that the message's size exceeds the
 941  * chunk size.
 942  *
 943  * If headers are enabled add an sb_hdr header and trailing alignment padding.
 944  *
 945  * To optimise performance the total number of msgbs should be kept
 946  * to a minimum. This is achieved by using any remaining space in message N
 947  * for both its own padding as well as the header of message N+1 if possible.
 948  * If there's insufficient space we allocate one message to hold this 'wrapper'.
 949  * (there's likely to be space beyond message N, since allocb would have
 950  * rounded up the required size to one of the dblk_sizes).
 951  *
 952  */
 953 static void
 954 sbaddmsg(queue_t *rq, mblk_t *mp)
 955 {
 956         struct sb       *sbp;
 957         struct timeval  t;
 958         struct sb_hdr   hp;
 959         mblk_t *wrapper;        /* padding for msg N, header for msg N+1 */
 960         mblk_t *last;           /* last mblk of current message */
 961         size_t wrapperlen;      /* length of header + padding */
 962         size_t origlen;         /* data length before truncation */
 963         size_t pad;             /* bytes required to align header */
 964 
 965         sbp = (struct sb *)rq->q_ptr;
 966 
 967         origlen = msgdsize(mp);
 968 
 969         /*
 970          * Truncate the message.
 971          */
 972         if ((sbp->sb_snap > 0) && (origlen > sbp->sb_snap) &&
 973                         (adjmsg(mp, -(origlen - sbp->sb_snap)) == 1))
 974                 hp.sbh_totlen = hp.sbh_msglen = sbp->sb_snap;
 975         else
 976                 hp.sbh_totlen = hp.sbh_msglen = origlen;
 977 
 978         if (sbp->sb_flags & SB_NO_HEADER) {
 979 
 980                 /*
 981                  * Would the inclusion of this message overflow the current
 982                  * chunk? If so close the chunk off and start a new one.
 983                  */
 984                 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
 985                         sbclosechunk(sbp);
 986                 /*
 987                  * First message too big for chunk - just send it up.
 988                  * This will always be true when we're not chunking.
 989                  */
 990                 if (hp.sbh_totlen > sbp->sb_chunk) {
 991                         sbsendit(rq, mp);
 992                         return;
 993                 }
 994 
 995                 /*
 996                  * We now know that the msg will fit in the chunk.
 997                  * Link it onto the end of the chunk.
 998                  * Since linkb() walks the entire chain, we keep a pointer to
 999                  * the first mblk of the last msgb added and call linkb on that
1000                  * that last message, rather than performing the
1001                  * O(n) linkb() operation on the whole chain.
1002                  * sb_head isn't needed in this SB_NO_HEADER mode.
1003                  */
1004                 if (sbp->sb_mp)
1005                         linkb(sbp->sb_tail, mp);
1006                 else
1007                         sbp->sb_mp = mp;
1008 
1009                 sbp->sb_tail = mp;
1010                 sbp->sb_mlen += hp.sbh_totlen;
1011                 sbp->sb_mcount++;
1012         } else {
1013                 /* Timestamp must be done immediately */
1014                 uniqtime(&t);
1015                 TIMEVAL_TO_TIMEVAL32(&hp.sbh_timestamp, &t);
1016 
1017                 pad = Align(hp.sbh_totlen);
1018                 hp.sbh_totlen += sizeof (hp);
1019                 hp.sbh_totlen += pad;
1020 
1021                 /*
1022                  * Would the inclusion of this message overflow the current
1023                  * chunk? If so close the chunk off and start a new one.
1024                  */
1025                 if ((hp.sbh_totlen + sbp->sb_mlen) > sbp->sb_chunk)
1026                                 sbclosechunk(sbp);
1027 
1028                 if (sbp->sb_head == NULL) {
1029                         /* Allocate leading header of new chunk */
1030                         sbp->sb_head = allocb(sizeof (hp), BPRI_MED);
1031                         if (sbp->sb_head == NULL) {
1032                                 /*
1033                                  * Memory allocation failure.
1034                                  * This will need to be revisited
1035                                  * since using certain flag combinations
1036                                  * can result in messages being dropped
1037                                  * silently.
1038                                  */
1039                                 freemsg(mp);
1040                                 sbp->sb_drops++;
1041                                 return;
1042                         }
1043                         sbp->sb_mp = sbp->sb_head;
1044                 }
1045 
1046                 /*
1047                  * Copy header into message
1048                  */
1049                 hp.sbh_drops = sbp->sb_drops;
1050                 hp.sbh_origlen = origlen;
1051                 (void) memcpy(sbp->sb_head->b_wptr, (char *)&hp, sizeof (hp));
1052                 sbp->sb_head->b_wptr += sizeof (hp);
1053 
1054                 ASSERT(sbp->sb_head->b_wptr <= sbp->sb_head->b_datap->db_lim);
1055 
1056                 /*
1057                  * Join message to the chunk
1058                  */
1059                 linkb(sbp->sb_head, mp);
1060 
1061                 sbp->sb_mcount++;
1062                 sbp->sb_mlen += hp.sbh_totlen;
1063 
1064                 /*
1065                  * If the first message alone is too big for the chunk close
1066                  * the chunk now.
1067                  * If the next message would immediately cause the chunk to
1068                  * overflow we may as well close the chunk now. The next
1069                  * message is certain to be at least SMALLEST_MESSAGE size.
1070                  */
1071                 if (hp.sbh_totlen + SMALLEST_MESSAGE > sbp->sb_chunk) {
1072                         sbclosechunk(sbp);
1073                         return;
1074                 }
1075 
1076                 /*
1077                  * Find space for the wrapper. The wrapper consists of:
1078                  *
1079                  * 1) Padding for this message (this is to ensure each header
1080                  * begins on an 8 byte boundary in the userland buffer).
1081                  *
1082                  * 2) Space for the next message's header, in case the next
1083                  * next message will fit in this chunk.
1084                  *
1085                  * It may be possible to append the wrapper to the last mblk
1086                  * of the message, but only if we 'own' the data. If the dblk
1087                  * has been shared through dupmsg() we mustn't alter it.
1088                  */
1089 
1090                 wrapperlen = (sizeof (hp) + pad);
1091 
1092                 /* Is there space for the wrapper beyond the message's data ? */
1093                 for (last = mp; last->b_cont; last = last->b_cont)
1094                         ;
1095 
1096                 if ((wrapperlen <= MBLKTAIL(last)) &&
1097                         (last->b_datap->db_ref == 1)) {
1098                         if (pad > 0) {
1099                                 /*
1100                                  * Pad with zeroes to the next pointer boundary
1101                                  * (we don't want to disclose kernel data to
1102                                  * users), then advance wptr.
1103                                  */
1104                                 (void) memset(last->b_wptr, 0, pad);
1105                                 last->b_wptr += pad;
1106                         }
1107                         /* Remember where to write the header information */
1108                         sbp->sb_head = last;
1109                 } else {
1110                         /* Have to allocate additional space for the wrapper */
1111                         wrapper = allocb(wrapperlen, BPRI_MED);
1112                         if (wrapper == NULL) {
1113                                 sbclosechunk(sbp);
1114                                 return;
1115                         }
1116                         if (pad > 0) {
1117                                 /*
1118                                  * Pad with zeroes (we don't want to disclose
1119                                  * kernel data to users).
1120                                  */
1121                                 (void) memset(wrapper->b_wptr, 0, pad);
1122                                 wrapper->b_wptr += pad;
1123                         }
1124                         /* Link the wrapper msg onto the end of the chunk */
1125                         linkb(mp, wrapper);
1126                         /* Remember to write the next header in this wrapper */
1127                         sbp->sb_head = wrapper;
1128                 }
1129         }
1130 }
1131 
1132 /*
1133  * Called from timeout().
1134  * Signal a timeout by passing a zero-length M_CTL msg in the read-side
1135  * to synchronize with any active module threads (open, close, wput, rput).
1136  */
1137 static void
1138 sbtick(void *arg)
1139 {
1140         struct sb *sbp = arg;
1141         queue_t *rq;
1142 
1143         ASSERT(sbp);
1144 
1145         rq = sbp->sb_rq;
1146         sbp->sb_timeoutid = 0;               /* timeout has fired */
1147 
1148         if (putctl(rq, M_CTL) == 0)     /* failure */
1149                 sbp->sb_timeoutid = qtimeout(rq, sbtick, sbp, sbp->sb_ticks);
1150 }
1151 
1152 /*
1153  * Close off the currently accumulating chunk and pass
1154  * it upward.  Takes care of resetting timers as well.
1155  *
1156  * This routine is called both directly and as a result
1157  * of the chunk timeout expiring.
1158  */
1159 static void
1160 sbclosechunk(struct sb *sbp)
1161 {
1162         mblk_t  *mp;
1163         queue_t *rq;
1164 
1165         ASSERT(sbp);
1166 
1167         if (sbp->sb_timeoutid) {
1168                 (void) quntimeout(sbp->sb_rq, sbp->sb_timeoutid);
1169                 sbp->sb_timeoutid = 0;
1170         }
1171 
1172         mp = sbp->sb_mp;
1173         rq = sbp->sb_rq;
1174 
1175         /*
1176          * If there's currently a chunk in progress, close it off
1177          * and try to send it up.
1178          */
1179         if (mp) {
1180                 sbsendit(rq, mp);
1181         }
1182 
1183         /*
1184          * Clear old chunk.  Ready for new msgs.
1185          */
1186         sbp->sb_tail = sbp->sb_mp = sbp->sb_head = NULL;
1187         sbp->sb_mlen = 0;
1188         sbp->sb_mcount = 0;
1189         if (sbp->sb_flags & SB_DEFER_CHUNK)
1190                 sbp->sb_state &= ~SB_FRCVD;
1191 
1192 }
1193 
1194 static void
1195 sbsendit(queue_t *rq, mblk_t *mp)
1196 {
1197         struct  sb      *sbp = (struct sb *)rq->q_ptr;
1198 
1199         if (!canputnext(rq)) {
1200                 if (sbp->sb_flags & SB_NO_DROPS)
1201                         (void) putq(rq, mp);
1202                 else {
1203                         freemsg(mp);
1204                         sbp->sb_drops += sbp->sb_mcount;
1205                 }
1206                 return;
1207         }
1208         /*
1209          * If there are messages on the q already, keep
1210          * queueing them since they need to be processed in order.
1211          */
1212         if (qsize(rq) > 0) {
1213                 /* should only get here if SB_NO_DROPS */
1214                 (void) putq(rq, mp);
1215         }
1216         else
1217                 putnext(rq, mp);
1218 }