634 mutex_exit(&stp->sd_qlock); \
635 queue_service(q); \
636 mutex_enter(&stp->sd_qlock); \
637 } \
638 ASSERT(stp->sd_nqueues == 0); \
639 ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL)); \
640 }
641
642 /*
643 * Constructor/destructor routines for the stream head cache
644 */
645 /* ARGSUSED */
646 static int
647 stream_head_constructor(void *buf, void *cdrarg, int kmflags)
648 {
649 stdata_t *stp = buf;
650
651 mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
652 mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
653 mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
654 cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
655 cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
656 cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
657 cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
658 cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
659 stp->sd_wrq = NULL;
660
661 return (0);
662 }
663
664 /* ARGSUSED */
665 static void
666 stream_head_destructor(void *buf, void *cdrarg)
667 {
668 stdata_t *stp = buf;
669
670 mutex_destroy(&stp->sd_lock);
671 mutex_destroy(&stp->sd_reflock);
672 mutex_destroy(&stp->sd_qlock);
673 cv_destroy(&stp->sd_monitor);
674 cv_destroy(&stp->sd_iocmonitor);
675 cv_destroy(&stp->sd_refmonitor);
676 cv_destroy(&stp->sd_qcv);
677 cv_destroy(&stp->sd_zcopy_wait);
678 }
679
680 /*
681 * Constructor/destructor routines for the queue cache
682 */
683 /* ARGSUSED */
684 static int
685 queue_constructor(void *buf, void *cdrarg, int kmflags)
686 {
687 queinfo_t *qip = buf;
688 queue_t *qp = &qip->qu_rqueue;
689 queue_t *wqp = &qip->qu_wqueue;
690 syncq_t *sq = &qip->qu_syncq;
691
692 qp->q_first = NULL;
693 qp->q_link = NULL;
694 qp->q_count = 0;
695 qp->q_mblkcnt = 0;
696 qp->q_sqhead = NULL;
697 qp->q_sqtail = NULL;
3299 stp->sd_rderrfunc = stp->sd_wrerrfunc = NULL;
3300 stp->sd_rputdatafunc = stp->sd_wputdatafunc = NULL;
3301 stp->sd_ciputctrl = NULL;
3302 stp->sd_nciputctrl = 0;
3303 stp->sd_qhead = NULL;
3304 stp->sd_qtail = NULL;
3305 stp->sd_servid = NULL;
3306 stp->sd_nqueues = 0;
3307 stp->sd_svcflags = 0;
3308 stp->sd_copyflag = 0;
3309
3310 return (stp);
3311 }
3312
3313 /*
3314 * Free a stream head.
3315 */
3316 void
3317 shfree(stdata_t *stp)
3318 {
3319 ASSERT(MUTEX_NOT_HELD(&stp->sd_lock));
3320
3321 stp->sd_wrq = NULL;
3322
3323 mutex_enter(&stp->sd_qlock);
3324 while (stp->sd_svcflags & STRS_SCHEDULED) {
3325 STRSTAT(strwaits);
3326 cv_wait(&stp->sd_qcv, &stp->sd_qlock);
3327 }
3328 mutex_exit(&stp->sd_qlock);
3329
3330 if (stp->sd_ciputctrl != NULL) {
3331 ASSERT(stp->sd_nciputctrl == n_ciputctrl - 1);
3332 SUMCHECK_CIPUTCTRL_COUNTS(stp->sd_ciputctrl,
3333 stp->sd_nciputctrl, 0);
3334 ASSERT(ciputctrl_cache != NULL);
3335 kmem_cache_free(ciputctrl_cache, stp->sd_ciputctrl);
3336 stp->sd_ciputctrl = NULL;
3337 stp->sd_nciputctrl = 0;
3338 }
3339 ASSERT(stp->sd_qhead == NULL);
3340 ASSERT(stp->sd_qtail == NULL);
3341 ASSERT(stp->sd_nqueues == 0);
3342 kmem_cache_free(stream_head_cache, stp);
3343 }
3344
3345 /*
3346 * Allocate a pair of queues and a syncq for the pair
3347 */
3348 queue_t *
3349 allocq(void)
3350 {
3351 queinfo_t *qip;
3352 queue_t *qp, *wqp;
3353 syncq_t *sq;
3354
3355 qip = kmem_cache_alloc(queue_cache, KM_SLEEP);
3356
3357 qp = &qip->qu_rqueue;
3358 wqp = &qip->qu_wqueue;
3359 sq = &qip->qu_syncq;
3360
3361 qp->q_last = NULL;
3362 qp->q_next = NULL;
3363 qp->q_ptr = NULL;
3364 qp->q_flag = QUSE | QREADR;
8068 mutex_exit(&stp->sd_lock);
8069 pollwakeup(&stp->sd_pollist, POLLIN|POLLRDNORM);
8070 mutex_enter(&stp->sd_lock);
8071
8072 if (stp->sd_sigflags & (S_INPUT|S_RDNORM))
8073 strsendsig(stp->sd_siglist, S_INPUT|S_RDNORM, 0, 0);
8074 mutex_exit(&stp->sd_lock);
8075 }
8076
8077 void
8078 strflushrq(vnode_t *vp, int flag)
8079 {
8080 struct stdata *stp = vp->v_stream;
8081
8082 mutex_enter(&stp->sd_lock);
8083 flushq(_RD(stp->sd_wrq), flag);
8084 mutex_exit(&stp->sd_lock);
8085 }
8086
8087 void
8088 strsetrputhooks(vnode_t *vp, uint_t flags,
8089 msgfunc_t protofunc, msgfunc_t miscfunc)
8090 {
8091 struct stdata *stp = vp->v_stream;
8092
8093 mutex_enter(&stp->sd_lock);
8094
8095 if (protofunc == NULL)
8096 stp->sd_rprotofunc = strrput_proto;
8097 else
8098 stp->sd_rprotofunc = protofunc;
8099
8100 if (miscfunc == NULL)
8101 stp->sd_rmiscfunc = strrput_misc;
8102 else
8103 stp->sd_rmiscfunc = miscfunc;
8104
8105 if (flags & SH_CONSOL_DATA)
8106 stp->sd_rput_opt |= SR_CONSOL_DATA;
8107 else
8108 stp->sd_rput_opt &= ~SR_CONSOL_DATA;
8109
|
634 mutex_exit(&stp->sd_qlock); \
635 queue_service(q); \
636 mutex_enter(&stp->sd_qlock); \
637 } \
638 ASSERT(stp->sd_nqueues == 0); \
639 ASSERT((stp->sd_qhead == NULL) && (stp->sd_qtail == NULL)); \
640 }
641
642 /*
643 * Constructor/destructor routines for the stream head cache
644 */
645 /* ARGSUSED */
646 static int
647 stream_head_constructor(void *buf, void *cdrarg, int kmflags)
648 {
649 stdata_t *stp = buf;
650
651 mutex_init(&stp->sd_lock, NULL, MUTEX_DEFAULT, NULL);
652 mutex_init(&stp->sd_reflock, NULL, MUTEX_DEFAULT, NULL);
653 mutex_init(&stp->sd_qlock, NULL, MUTEX_DEFAULT, NULL);
654 mutex_init(&stp->sd_pid_tree_lock, NULL, MUTEX_DEFAULT, NULL);
655 cv_init(&stp->sd_monitor, NULL, CV_DEFAULT, NULL);
656 cv_init(&stp->sd_iocmonitor, NULL, CV_DEFAULT, NULL);
657 cv_init(&stp->sd_refmonitor, NULL, CV_DEFAULT, NULL);
658 cv_init(&stp->sd_qcv, NULL, CV_DEFAULT, NULL);
659 cv_init(&stp->sd_zcopy_wait, NULL, CV_DEFAULT, NULL);
660 avl_create(&stp->sd_pid_tree, pid_node_comparator, sizeof (pid_node_t),
661 offsetof(pid_node_t, pn_ref_link));
662 stp->sd_wrq = NULL;
663
664 return (0);
665 }
666
667 /* ARGSUSED */
668 static void
669 stream_head_destructor(void *buf, void *cdrarg)
670 {
671 stdata_t *stp = buf;
672
673 mutex_destroy(&stp->sd_lock);
674 mutex_destroy(&stp->sd_reflock);
675 mutex_destroy(&stp->sd_qlock);
676 mutex_destroy(&stp->sd_pid_tree_lock);
677 cv_destroy(&stp->sd_monitor);
678 cv_destroy(&stp->sd_iocmonitor);
679 cv_destroy(&stp->sd_refmonitor);
680 cv_destroy(&stp->sd_qcv);
681 cv_destroy(&stp->sd_zcopy_wait);
682 avl_destroy(&stp->sd_pid_tree);
683 }
684
685 /*
686 * Constructor/destructor routines for the queue cache
687 */
688 /* ARGSUSED */
689 static int
690 queue_constructor(void *buf, void *cdrarg, int kmflags)
691 {
692 queinfo_t *qip = buf;
693 queue_t *qp = &qip->qu_rqueue;
694 queue_t *wqp = &qip->qu_wqueue;
695 syncq_t *sq = &qip->qu_syncq;
696
697 qp->q_first = NULL;
698 qp->q_link = NULL;
699 qp->q_count = 0;
700 qp->q_mblkcnt = 0;
701 qp->q_sqhead = NULL;
702 qp->q_sqtail = NULL;
3304 stp->sd_rderrfunc = stp->sd_wrerrfunc = NULL;
3305 stp->sd_rputdatafunc = stp->sd_wputdatafunc = NULL;
3306 stp->sd_ciputctrl = NULL;
3307 stp->sd_nciputctrl = 0;
3308 stp->sd_qhead = NULL;
3309 stp->sd_qtail = NULL;
3310 stp->sd_servid = NULL;
3311 stp->sd_nqueues = 0;
3312 stp->sd_svcflags = 0;
3313 stp->sd_copyflag = 0;
3314
3315 return (stp);
3316 }
3317
3318 /*
3319 * Free a stream head.
3320 */
3321 void
3322 shfree(stdata_t *stp)
3323 {
3324 pid_node_t *pn;
3325
3326 ASSERT(MUTEX_NOT_HELD(&stp->sd_lock));
3327
3328 stp->sd_wrq = NULL;
3329
3330 mutex_enter(&stp->sd_qlock);
3331 while (stp->sd_svcflags & STRS_SCHEDULED) {
3332 STRSTAT(strwaits);
3333 cv_wait(&stp->sd_qcv, &stp->sd_qlock);
3334 }
3335 mutex_exit(&stp->sd_qlock);
3336
3337 if (stp->sd_ciputctrl != NULL) {
3338 ASSERT(stp->sd_nciputctrl == n_ciputctrl - 1);
3339 SUMCHECK_CIPUTCTRL_COUNTS(stp->sd_ciputctrl,
3340 stp->sd_nciputctrl, 0);
3341 ASSERT(ciputctrl_cache != NULL);
3342 kmem_cache_free(ciputctrl_cache, stp->sd_ciputctrl);
3343 stp->sd_ciputctrl = NULL;
3344 stp->sd_nciputctrl = 0;
3345 }
3346 ASSERT(stp->sd_qhead == NULL);
3347 ASSERT(stp->sd_qtail == NULL);
3348 ASSERT(stp->sd_nqueues == 0);
3349
3350 mutex_enter(&stp->sd_pid_tree_lock);
3351 while ((pn = avl_first(&stp->sd_pid_tree)) != NULL) {
3352 avl_remove(&stp->sd_pid_tree, pn);
3353 kmem_free(pn, sizeof (*pn));
3354 }
3355 mutex_exit(&stp->sd_pid_tree_lock);
3356
3357 kmem_cache_free(stream_head_cache, stp);
3358 }
3359
3360 void
3361 sh_insert_pid(struct stdata *stp, pid_t pid)
3362 {
3363 pid_node_t *pn, lookup_pn;
3364 avl_index_t idx_pn;
3365
3366 lookup_pn.pn_pid = pid;
3367 mutex_enter(&stp->sd_pid_tree_lock);
3368 pn = avl_find(&stp->sd_pid_tree, &lookup_pn, &idx_pn);
3369
3370 if (pn != NULL) {
3371 pn->pn_count++;
3372 } else {
3373 pn = kmem_zalloc(sizeof (*pn), KM_SLEEP);
3374 pn->pn_pid = pid;
3375 pn->pn_count = 1;
3376 avl_insert(&stp->sd_pid_tree, pn, idx_pn);
3377 }
3378 mutex_exit(&stp->sd_pid_tree_lock);
3379 }
3380
3381 void
3382 sh_remove_pid(struct stdata *stp, pid_t pid)
3383 {
3384 pid_node_t *pn, lookup_pn;
3385
3386 lookup_pn.pn_pid = pid;
3387 mutex_enter(&stp->sd_pid_tree_lock);
3388 pn = avl_find(&stp->sd_pid_tree, &lookup_pn, NULL);
3389
3390 if (pn != NULL) {
3391 if (pn->pn_count > 1) {
3392 pn->pn_count--;
3393 } else {
3394 avl_remove(&stp->sd_pid_tree, pn);
3395 kmem_free(pn, sizeof (*pn));
3396 }
3397 }
3398 mutex_exit(&stp->sd_pid_tree_lock);
3399 }
3400
3401 mblk_t *
3402 sh_get_pid_mblk(struct stdata *stp)
3403 {
3404 mblk_t *mblk;
3405 ulong_t sz, n;
3406 pid_t *pids;
3407 pid_node_t *pn;
3408 conn_pid_info_t *cpi;
3409
3410 mutex_enter(&stp->sd_pid_tree_lock);
3411
3412 n = avl_numnodes(&stp->sd_pid_tree);
3413 sz = sizeof (conn_pid_info_t);
3414 sz += (n > 1) ? ((n - 1) * sizeof (pid_t)) : 0;
3415 if ((mblk = allocb(sz, BPRI_HI)) == NULL) {
3416 mutex_exit(&stp->sd_pid_tree_lock);
3417 return (NULL);
3418 }
3419 mblk->b_wptr += sz;
3420 cpi = (conn_pid_info_t *)mblk->b_datap->db_base;
3421 cpi->cpi_magic = CONN_PID_INFO_MGC;
3422 cpi->cpi_contents = CONN_PID_INFO_XTI;
3423 cpi->cpi_pids_cnt = n;
3424 cpi->cpi_tot_size = sz;
3425 cpi->cpi_pids[0] = 0;
3426
3427 if (cpi->cpi_pids_cnt > 0) {
3428 pids = cpi->cpi_pids;
3429 for (pn = avl_first(&stp->sd_pid_tree); pn != NULL;
3430 pids++, pn = AVL_NEXT(&stp->sd_pid_tree, pn))
3431 *pids = pn->pn_pid;
3432 }
3433 mutex_exit(&stp->sd_pid_tree_lock);
3434 return (mblk);
3435 }
3436
3437 /*
3438 * Allocate a pair of queues and a syncq for the pair
3439 */
3440 queue_t *
3441 allocq(void)
3442 {
3443 queinfo_t *qip;
3444 queue_t *qp, *wqp;
3445 syncq_t *sq;
3446
3447 qip = kmem_cache_alloc(queue_cache, KM_SLEEP);
3448
3449 qp = &qip->qu_rqueue;
3450 wqp = &qip->qu_wqueue;
3451 sq = &qip->qu_syncq;
3452
3453 qp->q_last = NULL;
3454 qp->q_next = NULL;
3455 qp->q_ptr = NULL;
3456 qp->q_flag = QUSE | QREADR;
8160 mutex_exit(&stp->sd_lock);
8161 pollwakeup(&stp->sd_pollist, POLLIN|POLLRDNORM);
8162 mutex_enter(&stp->sd_lock);
8163
8164 if (stp->sd_sigflags & (S_INPUT|S_RDNORM))
8165 strsendsig(stp->sd_siglist, S_INPUT|S_RDNORM, 0, 0);
8166 mutex_exit(&stp->sd_lock);
8167 }
8168
8169 void
8170 strflushrq(vnode_t *vp, int flag)
8171 {
8172 struct stdata *stp = vp->v_stream;
8173
8174 mutex_enter(&stp->sd_lock);
8175 flushq(_RD(stp->sd_wrq), flag);
8176 mutex_exit(&stp->sd_lock);
8177 }
8178
8179 void
8180 strsetrputhooks(vnode_t *vp, uint_t flags, msgfunc_t protofunc,
8181 msgfunc_t miscfunc)
8182 {
8183 struct stdata *stp = vp->v_stream;
8184
8185 mutex_enter(&stp->sd_lock);
8186
8187 if (protofunc == NULL)
8188 stp->sd_rprotofunc = strrput_proto;
8189 else
8190 stp->sd_rprotofunc = protofunc;
8191
8192 if (miscfunc == NULL)
8193 stp->sd_rmiscfunc = strrput_misc;
8194 else
8195 stp->sd_rmiscfunc = miscfunc;
8196
8197 if (flags & SH_CONSOL_DATA)
8198 stp->sd_rput_opt |= SR_CONSOL_DATA;
8199 else
8200 stp->sd_rput_opt &= ~SR_CONSOL_DATA;
8201
|