1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */ 27 /* All Rights Reserved */ 28 29 30 /* 31 * Inter-Process Communication Message Facility. 32 * 33 * See os/ipc.c for a description of common IPC functionality. 34 * 35 * Resource controls 36 * ----------------- 37 * 38 * Control: zone.max-msg-ids (rc_zone_msgmni) 39 * Description: Maximum number of message queue ids allowed a zone. 40 * 41 * When msgget() is used to allocate a message queue, one id is 42 * allocated. If the id allocation doesn't succeed, msgget() fails 43 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID) 44 * the id is deallocated. 45 * 46 * Control: project.max-msg-ids (rc_project_msgmni) 47 * Description: Maximum number of message queue ids allowed a project. 48 * 49 * When msgget() is used to allocate a message queue, one id is 50 * allocated. If the id allocation doesn't succeed, msgget() fails 51 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID) 52 * the id is deallocated. 53 * 54 * Control: process.max-msg-qbytes (rc_process_msgmnb) 55 * Description: Maximum number of bytes of messages on a message queue. 56 * 57 * When msgget() successfully allocates a message queue, the minimum 58 * enforced value of this limit is used to initialize msg_qbytes. 59 * 60 * Control: process.max-msg-messages (rc_process_msgtql) 61 * Description: Maximum number of messages on a message queue. 62 * 63 * When msgget() successfully allocates a message queue, the minimum 64 * enforced value of this limit is used to initialize a per-queue 65 * limit on the number of messages. 66 */ 67 68 #include <sys/types.h> 69 #include <sys/t_lock.h> 70 #include <sys/param.h> 71 #include <sys/cred.h> 72 #include <sys/user.h> 73 #include <sys/proc.h> 74 #include <sys/time.h> 75 #include <sys/ipc.h> 76 #include <sys/ipc_impl.h> 77 #include <sys/msg.h> 78 #include <sys/msg_impl.h> 79 #include <sys/list.h> 80 #include <sys/systm.h> 81 #include <sys/sysmacros.h> 82 #include <sys/cpuvar.h> 83 #include <sys/kmem.h> 84 #include <sys/ddi.h> 85 #include <sys/errno.h> 86 #include <sys/cmn_err.h> 87 #include <sys/debug.h> 88 #include <sys/project.h> 89 #include <sys/modctl.h> 90 #include <sys/syscall.h> 91 #include <sys/policy.h> 92 #include <sys/zone.h> 93 94 #include <c2/audit.h> 95 96 /* 97 * The following tunables are obsolete. Though for compatibility we 98 * still read and interpret msginfo_msgmnb, msginfo_msgmni, and 99 * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred 100 * mechanism for administrating the IPC Message facility is through the 101 * resource controls described at the top of this file. 102 */ 103 size_t msginfo_msgmax = 2048; /* (obsolete) */ 104 size_t msginfo_msgmnb = 4096; /* (obsolete) */ 105 int msginfo_msgmni = 50; /* (obsolete) */ 106 int msginfo_msgtql = 40; /* (obsolete) */ 107 int msginfo_msgssz = 8; /* (obsolete) */ 108 int msginfo_msgmap = 0; /* (obsolete) */ 109 ushort_t msginfo_msgseg = 1024; /* (obsolete) */ 110 111 extern rctl_hndl_t rc_zone_msgmni; 112 extern rctl_hndl_t rc_project_msgmni; 113 extern rctl_hndl_t rc_process_msgmnb; 114 extern rctl_hndl_t rc_process_msgtql; 115 static ipc_service_t *msq_svc; 116 static zone_key_t msg_zone_key; 117 118 static void msg_dtor(kipc_perm_t *); 119 static void msg_rmid(kipc_perm_t *); 120 static void msg_remove_zone(zoneid_t, void *); 121 122 /* 123 * Module linkage information for the kernel. 124 */ 125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2, 126 uintptr_t a4, uintptr_t a5); 127 128 static struct sysent ipcmsg_sysent = { 129 6, 130 #ifdef _LP64 131 SE_ARGC | SE_NOUNLOAD | SE_64RVAL, 132 #else 133 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1, 134 #endif 135 (int (*)())msgsys 136 }; 137 138 #ifdef _SYSCALL32_IMPL 139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2, 140 uint32_t a4, uint32_t a5); 141 142 static struct sysent ipcmsg_sysent32 = { 143 6, 144 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1, 145 (int (*)())msgsys32 146 }; 147 #endif /* _SYSCALL32_IMPL */ 148 149 static struct modlsys modlsys = { 150 &mod_syscallops, "System V message facility", &ipcmsg_sysent 151 }; 152 153 #ifdef _SYSCALL32_IMPL 154 static struct modlsys modlsys32 = { 155 &mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32 156 }; 157 #endif 158 159 /* 160 * Big Theory statement for message queue correctness 161 * 162 * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up 163 * receivers who are waiting for an event. Using the cv_broadcast method 164 * resulted in negative scaling when the number of waiting receivers are large 165 * (the thundering herd problem). Instead, the receivers waiting to receive a 166 * message are now linked in a queue-like fashion and awaken one at a time in 167 * a controlled manner. 168 * 169 * Receivers can block on two different classes of waiting list: 170 * 1) "sendwait" list, which is the more complex list of the two. The 171 * receiver will be awakened by a sender posting a new message. There 172 * are two types of "sendwait" list used: 173 * a) msg_wait_snd: handles all receivers who are looking for 174 * a message type >= 0, but was unable to locate a match. 175 * 176 * slot 0: reserved for receivers that have designated they 177 * will take any message type. 178 * rest: consist of receivers requesting a specific type 179 * but the type was not present. The entries are 180 * hashed into a bucket in an attempt to keep 181 * any list search relatively short. 182 * b) msg_wait_snd_ngt: handles all receivers that have designated 183 * a negative message type. Unlike msg_wait_snd, the hash bucket 184 * serves a range of negative message types (-1 to -5, -6 to -10 185 * and so forth), where the last bucket is reserved for all the 186 * negative message types that hash outside of MSG_MAX_QNUM - 1. 187 * This is done this way to simplify the operation of locating a 188 * negative message type. 189 * 190 * 2) "copyout" list, where the receiver is awakened by another 191 * receiver after a message is copied out. This is a linked list 192 * of waiters that are awakened one at a time. Although the solution is 193 * not optimal, the complexity that would be added in for waking 194 * up the right entry far exceeds any potential pay back (too many 195 * correctness and corner case issues). 196 * 197 * The lists are doubly linked. In the case of the "sendwait" 198 * list, this allows the thread to remove itself from the list without having 199 * to traverse the list. In the case of the "copyout" list it simply allows 200 * us to use common functions with the "sendwait" list. 201 * 202 * To make sure receivers are not hung out to dry, we must guarantee: 203 * 1. If any queued message matches any receiver, then at least one 204 * matching receiver must be processing the request. 205 * 2. Blocking on the copyout queue is only temporary while messages 206 * are being copied out. The process is guaranted to wakeup 207 * when it gets to front of the queue (copyout is a FIFO). 208 * 209 * Rules for blocking and waking up: 210 * 1. A receiver entering msgrcv must examine all messages for a match 211 * before blocking on a sendwait queue. 212 * 2. If the receiver blocks because the message it chose is already 213 * being copied out, then when it wakes up needs to start start 214 * checking the messages from the beginning. 215 * 3) When ever a process returns from msgrcv for any reason, if it 216 * had attempted to copy a message or blocked waiting for a copy 217 * to complete it needs to wakeup the next receiver blocked on 218 * a copy out. 219 * 4) When a message is sent, the sender selects a process waiting 220 * for that type of message. This selection process rotates between 221 * receivers types of 0, negative and positive to prevent starvation of 222 * any one particular receiver type. 223 * 5) The following are the scenarios for processes that are awakened 224 * by a msgsnd: 225 * a) The process finds the message and is able to copy 226 * it out. Once complete, the process returns. 227 * b) The message that was sent that triggered the wakeup is no 228 * longer available (another process found the message first). 229 * We issue a wakeup on copy queue and then go back to 230 * sleep waiting for another matching message to be sent. 231 * c) The message that was supposed to be processed was 232 * already serviced by another process. However a different 233 * message is present which we can service. The message 234 * is copied and the process returns. 235 * d) The message is found, but some sort of error occurs that 236 * prevents the message from being copied. The receiver 237 * wakes up the next sender that can service this message 238 * type and returns an error to the caller. 239 * e) The message is found, but it is marked as being copied 240 * out. The receiver then goes to sleep on the copyout 241 * queue where it will be awakened again sometime in the future. 242 * 243 * 244 * 6) Whenever a message is found that matches the message type designated, 245 * but is being copied out we have to block on the copyout queue. 246 * After process copying finishes the copy out, it must wakeup (either 247 * directly or indirectly) all receivers who blocked on its copyout, 248 * so they are guaranteed a chance to examine the remaining messages. 249 * This is implemented via a chain of wakeups: Y wakes X, who wakes Z, 250 * and so on. The chain cannot be broken. This leads to the following 251 * cases: 252 * a) A receiver is finished copying the message (or encountered) 253 * an error), the first entry on the copyout queue is woken 254 * up. 255 * b) When the receiver is woken up, it attempts to locate 256 * a message type match. 257 * c) If a message type is found and 258 * -- MSG_RCVCOPY flag is not set, the message is 259 * marked for copying out. Regardless of the copyout 260 * success the next entry on the copyout queue is 261 * awakened and the operation is completed. 262 * -- MSG_RCVCOPY is set, we simply go back to sleep again 263 * on the copyout queue. 264 * d) If the message type is not found then we wakeup the next 265 * process on the copyout queue. 266 * 7) If a msgsnd is unable to complete for of any of the following reasons 267 * a) the msgq has no space for the message 268 * b) the maximum number of messages allowed has been reached 269 * then one of two things happen: 270 * 1) If the passed in msg_flag has IPC_NOWAIT set, then 271 * an error is returned. 272 * 2) The IPC_NOWAIT bit is not set in msg_flag, then the 273 * the thread is placed to sleep until the request can be 274 * serviced. 275 * 8) When waking a thread waiting to send a message, a check is done to 276 * verify that the operation being asked for by the thread will complete. 277 * This decision making process is done in a loop where the oldest request 278 * is checked first. The search will continue until there is no more 279 * room on the msgq or we have checked all the waiters. 280 */ 281 282 static uint_t msg_type_hash(long); 283 static int msgq_check_err(kmsqid_t *qp, int cvres); 284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **, 285 kmsqid_t *); 286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t, 287 struct msg *, struct ipcmsgbuf *, int); 288 static void msg_rcvq_wakeup_all(list_t *); 289 static void msg_wakeup_senders(kmsqid_t *); 290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long); 291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long); 292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long); 293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long); 294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long); 295 static struct msg *msgrcv_lookup(kmsqid_t *, long); 296 297 msg_select_t msg_fnd_sndr[] = { 298 { msg_fnd_any_snd, &msg_fnd_sndr[1] }, 299 { msg_fnd_spc_snd, &msg_fnd_sndr[2] }, 300 { msg_fnd_neg_snd, &msg_fnd_sndr[0] } 301 }; 302 303 msg_select_t msg_fnd_rdr[1] = { 304 { msg_fnd_any_rdr, &msg_fnd_rdr[0] }, 305 }; 306 307 static struct modlinkage modlinkage = { 308 MODREV_1, 309 { &modlsys, 310 #ifdef _SYSCALL32_IMPL 311 &modlsys32, 312 #endif 313 NULL 314 } 315 }; 316 317 #define MSG_SMALL_INIT (size_t)-1 318 int 319 _init(void) 320 { 321 int result; 322 323 msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni, 324 sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG, 325 offsetof(ipc_rqty_t, ipcq_msgmni)); 326 zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL); 327 328 if ((result = mod_install(&modlinkage)) == 0) 329 return (0); 330 331 (void) zone_key_delete(msg_zone_key); 332 ipcs_destroy(msq_svc); 333 334 return (result); 335 } 336 337 int 338 _fini(void) 339 { 340 return (EBUSY); 341 } 342 343 int 344 _info(struct modinfo *modinfop) 345 { 346 return (mod_info(&modlinkage, modinfop)); 347 } 348 349 static void 350 msg_dtor(kipc_perm_t *perm) 351 { 352 kmsqid_t *qp = (kmsqid_t *)perm; 353 int ii; 354 355 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) { 356 ASSERT(list_is_empty(&qp->msg_wait_snd[ii])); 357 ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii])); 358 list_destroy(&qp->msg_wait_snd[ii]); 359 list_destroy(&qp->msg_wait_snd_ngt[ii]); 360 } 361 ASSERT(list_is_empty(&qp->msg_cpy_block)); 362 ASSERT(list_is_empty(&qp->msg_wait_rcv)); 363 list_destroy(&qp->msg_cpy_block); 364 ASSERT(qp->msg_snd_cnt == 0); 365 ASSERT(qp->msg_cbytes == 0); 366 list_destroy(&qp->msg_list); 367 list_destroy(&qp->msg_wait_rcv); 368 } 369 370 371 #define msg_hold(mp) (mp)->msg_copycnt++ 372 373 /* 374 * msg_rele - decrement the reference count on the message. When count 375 * reaches zero, free message header and contents. 376 */ 377 static void 378 msg_rele(struct msg *mp) 379 { 380 ASSERT(mp->msg_copycnt > 0); 381 if (mp->msg_copycnt-- == 1) { 382 if (mp->msg_addr) 383 kmem_free(mp->msg_addr, mp->msg_size); 384 kmem_free(mp, sizeof (struct msg)); 385 } 386 } 387 388 /* 389 * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone 390 * waiting for free bytes on queue. 391 * 392 * Called with queue locked. 393 */ 394 static void 395 msgunlink(kmsqid_t *qp, struct msg *mp) 396 { 397 list_remove(&qp->msg_list, mp); 398 qp->msg_qnum--; 399 qp->msg_cbytes -= mp->msg_size; 400 msg_rele(mp); 401 402 /* Wake up waiting writers */ 403 msg_wakeup_senders(qp); 404 } 405 406 static void 407 msg_rmid(kipc_perm_t *perm) 408 { 409 kmsqid_t *qp = (kmsqid_t *)perm; 410 struct msg *mp; 411 int ii; 412 413 414 while ((mp = list_head(&qp->msg_list)) != NULL) 415 msgunlink(qp, mp); 416 ASSERT(qp->msg_cbytes == 0); 417 418 /* 419 * Wake up everyone who is in a wait state of some sort 420 * for this message queue. 421 */ 422 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) { 423 msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]); 424 msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]); 425 } 426 msg_rcvq_wakeup_all(&qp->msg_cpy_block); 427 msg_rcvq_wakeup_all(&qp->msg_wait_rcv); 428 } 429 430 /* 431 * msgctl system call. 432 * 433 * gets q lock (via ipc_lookup), releases before return. 434 * may call users of msg_lock 435 */ 436 static int 437 msgctl(int msgid, int cmd, void *arg) 438 { 439 STRUCT_DECL(msqid_ds, ds); /* SVR4 queue work area */ 440 kmsqid_t *qp; /* ptr to associated q */ 441 int error; 442 struct cred *cr; 443 model_t mdl = get_udatamodel(); 444 struct msqid_ds64 ds64; 445 kmutex_t *lock; 446 proc_t *pp = curproc; 447 448 STRUCT_INIT(ds, mdl); 449 cr = CRED(); 450 451 /* 452 * Perform pre- or non-lookup actions (e.g. copyins, RMID). 453 */ 454 switch (cmd) { 455 case IPC_SET: 456 if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds))) 457 return (set_errno(EFAULT)); 458 break; 459 460 case IPC_SET64: 461 if (copyin(arg, &ds64, sizeof (struct msqid_ds64))) 462 return (set_errno(EFAULT)); 463 break; 464 465 case IPC_RMID: 466 if (error = ipc_rmid(msq_svc, msgid, cr)) 467 return (set_errno(error)); 468 return (0); 469 } 470 471 /* 472 * get msqid_ds for this msgid 473 */ 474 if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL) 475 return (set_errno(EINVAL)); 476 477 switch (cmd) { 478 case IPC_SET: 479 if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes && 480 secpolicy_ipc_config(cr) != 0) { 481 mutex_exit(lock); 482 return (set_errno(EPERM)); 483 } 484 if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm, 485 &STRUCT_BUF(ds)->msg_perm, mdl)) { 486 mutex_exit(lock); 487 return (set_errno(error)); 488 } 489 qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes); 490 qp->msg_ctime = gethrestime_sec(); 491 break; 492 493 case IPC_STAT: 494 if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) { 495 mutex_exit(lock); 496 return (set_errno(error)); 497 } 498 499 if (qp->msg_rcv_cnt) 500 qp->msg_perm.ipc_mode |= MSG_RWAIT; 501 if (qp->msg_snd_cnt) 502 qp->msg_perm.ipc_mode |= MSG_WWAIT; 503 ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl); 504 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT); 505 STRUCT_FSETP(ds, msg_first, NULL); /* kernel addr */ 506 STRUCT_FSETP(ds, msg_last, NULL); 507 STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes); 508 STRUCT_FSET(ds, msg_qnum, qp->msg_qnum); 509 STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes); 510 STRUCT_FSET(ds, msg_lspid, qp->msg_lspid); 511 STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid); 512 STRUCT_FSET(ds, msg_stime, qp->msg_stime); 513 STRUCT_FSET(ds, msg_rtime, qp->msg_rtime); 514 STRUCT_FSET(ds, msg_ctime, qp->msg_ctime); 515 break; 516 517 case IPC_SET64: 518 mutex_enter(&pp->p_lock); 519 if ((ds64.msgx_qbytes > qp->msg_qbytes) && 520 secpolicy_ipc_config(cr) != 0 && 521 rctl_test(rc_process_msgmnb, pp->p_rctls, pp, 522 ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) { 523 mutex_exit(&pp->p_lock); 524 mutex_exit(lock); 525 return (set_errno(EPERM)); 526 } 527 mutex_exit(&pp->p_lock); 528 if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm, 529 &ds64.msgx_perm)) { 530 mutex_exit(lock); 531 return (set_errno(error)); 532 } 533 qp->msg_qbytes = ds64.msgx_qbytes; 534 qp->msg_ctime = gethrestime_sec(); 535 break; 536 537 case IPC_STAT64: 538 if (qp->msg_rcv_cnt) 539 qp->msg_perm.ipc_mode |= MSG_RWAIT; 540 if (qp->msg_snd_cnt) 541 qp->msg_perm.ipc_mode |= MSG_WWAIT; 542 ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm); 543 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT); 544 ds64.msgx_cbytes = qp->msg_cbytes; 545 ds64.msgx_qnum = qp->msg_qnum; 546 ds64.msgx_qbytes = qp->msg_qbytes; 547 ds64.msgx_lspid = qp->msg_lspid; 548 ds64.msgx_lrpid = qp->msg_lrpid; 549 ds64.msgx_stime = qp->msg_stime; 550 ds64.msgx_rtime = qp->msg_rtime; 551 ds64.msgx_ctime = qp->msg_ctime; 552 break; 553 554 default: 555 mutex_exit(lock); 556 return (set_errno(EINVAL)); 557 } 558 559 mutex_exit(lock); 560 561 /* 562 * Do copyout last (after releasing mutex). 563 */ 564 switch (cmd) { 565 case IPC_STAT: 566 if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds))) 567 return (set_errno(EFAULT)); 568 break; 569 570 case IPC_STAT64: 571 if (copyout(&ds64, arg, sizeof (struct msqid_ds64))) 572 return (set_errno(EFAULT)); 573 break; 574 } 575 576 return (0); 577 } 578 579 /* 580 * Remove all message queues associated with a given zone. Called by 581 * zone_shutdown when the zone is halted. 582 */ 583 /*ARGSUSED1*/ 584 static void 585 msg_remove_zone(zoneid_t zoneid, void *arg) 586 { 587 ipc_remove_zone(msq_svc, zoneid); 588 } 589 590 /* 591 * msgget system call. 592 */ 593 static int 594 msgget(key_t key, int msgflg) 595 { 596 kmsqid_t *qp; 597 kmutex_t *lock; 598 int id, error; 599 int ii; 600 proc_t *pp = curproc; 601 602 top: 603 if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock)) 604 return (set_errno(error)); 605 606 if (IPC_FREE(&qp->msg_perm)) { 607 mutex_exit(lock); 608 mutex_exit(&pp->p_lock); 609 610 list_create(&qp->msg_list, sizeof (struct msg), 611 offsetof(struct msg, msg_node)); 612 qp->msg_qnum = 0; 613 qp->msg_lspid = qp->msg_lrpid = 0; 614 qp->msg_stime = qp->msg_rtime = 0; 615 qp->msg_ctime = gethrestime_sec(); 616 qp->msg_ngt_cnt = 0; 617 qp->msg_neg_copy = 0; 618 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) { 619 list_create(&qp->msg_wait_snd[ii], 620 sizeof (msgq_wakeup_t), 621 offsetof(msgq_wakeup_t, msgw_list)); 622 list_create(&qp->msg_wait_snd_ngt[ii], 623 sizeof (msgq_wakeup_t), 624 offsetof(msgq_wakeup_t, msgw_list)); 625 } 626 /* 627 * The proper initialization of msg_lowest_type is to the 628 * highest possible value. By doing this we guarantee that 629 * when the first send happens, the lowest type will be set 630 * properly. 631 */ 632 qp->msg_lowest_type = MSG_SMALL_INIT; 633 list_create(&qp->msg_cpy_block, 634 sizeof (msgq_wakeup_t), 635 offsetof(msgq_wakeup_t, msgw_list)); 636 list_create(&qp->msg_wait_rcv, 637 sizeof (msgq_wakeup_t), 638 offsetof(msgq_wakeup_t, msgw_list)); 639 qp->msg_fnd_sndr = &msg_fnd_sndr[0]; 640 qp->msg_fnd_rdr = &msg_fnd_rdr[0]; 641 qp->msg_rcv_cnt = 0; 642 qp->msg_snd_cnt = 0; 643 qp->msg_snd_smallest = MSG_SMALL_INIT; 644 645 if (error = ipc_commit_begin(msq_svc, key, msgflg, 646 (kipc_perm_t *)qp)) { 647 if (error == EAGAIN) 648 goto top; 649 return (set_errno(error)); 650 } 651 qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb, 652 pp->p_rctls, pp); 653 qp->msg_qmax = rctl_enforced_value(rc_process_msgtql, 654 pp->p_rctls, pp); 655 lock = ipc_commit_end(msq_svc, &qp->msg_perm); 656 } 657 658 if (AU_AUDITING()) 659 audit_ipcget(AT_IPC_MSG, (void *)qp); 660 661 id = qp->msg_perm.ipc_id; 662 mutex_exit(lock); 663 return (id); 664 } 665 666 static ssize_t 667 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg) 668 { 669 struct msg *smp; /* ptr to best msg on q */ 670 kmsqid_t *qp; /* ptr to associated q */ 671 kmutex_t *lock; 672 size_t xtsz; /* transfer byte count */ 673 int error = 0; 674 int cvres; 675 uint_t msg_hash; 676 msgq_wakeup_t msg_entry; 677 678 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */ 679 680 msg_hash = msg_type_hash(msgtyp); 681 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) { 682 return ((ssize_t)set_errno(EINVAL)); 683 } 684 ipc_hold(msq_svc, (kipc_perm_t *)qp); 685 686 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) { 687 goto msgrcv_out; 688 } 689 690 /* 691 * Various information (including the condvar_t) required for the 692 * process to sleep is provided by it's stack. 693 */ 694 msg_entry.msgw_thrd = curthread; 695 msg_entry.msgw_snd_wake = 0; 696 msg_entry.msgw_type = msgtyp; 697 findmsg: 698 smp = msgrcv_lookup(qp, msgtyp); 699 700 if (smp) { 701 /* 702 * We found a possible message to copy out. 703 */ 704 if ((smp->msg_flags & MSG_RCVCOPY) == 0) { 705 long t = msg_entry.msgw_snd_wake; 706 long copy_type = smp->msg_type; 707 708 /* 709 * It is available, attempt to copy it. 710 */ 711 error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz, 712 smp, msgp, msgflg); 713 714 /* 715 * It is possible to consume a different message 716 * type then what originally awakened for (negative 717 * types). If this happens a check must be done to 718 * to determine if another receiver is available 719 * for the waking message type, Failure to do this 720 * can result in a message on the queue that can be 721 * serviced by a sleeping receiver. 722 */ 723 if (!error && t && (copy_type != t)) 724 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t); 725 726 /* 727 * Don't forget to wakeup a sleeper that blocked because 728 * we were copying things out. 729 */ 730 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0); 731 goto msgrcv_out; 732 } 733 /* 734 * The selected message is being copied out, so block. We do 735 * not need to wake the next person up on the msg_cpy_block list 736 * due to the fact some one is copying out and they will get 737 * things moving again once the copy is completed. 738 */ 739 cvres = msg_rcvq_sleep(&qp->msg_cpy_block, 740 &msg_entry, &lock, qp); 741 error = msgq_check_err(qp, cvres); 742 if (error) { 743 goto msgrcv_out; 744 } 745 goto findmsg; 746 } 747 /* 748 * There isn't a message to copy out that matches the designated 749 * criteria. 750 */ 751 if (msgflg & IPC_NOWAIT) { 752 error = ENOMSG; 753 goto msgrcv_out; 754 } 755 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0); 756 757 /* 758 * Wait for new message. We keep the negative and positive types 759 * separate for performance reasons. 760 */ 761 msg_entry.msgw_snd_wake = 0; 762 if (msgtyp >= 0) { 763 cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash], 764 &msg_entry, &lock, qp); 765 } else { 766 qp->msg_ngt_cnt++; 767 cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash], 768 &msg_entry, &lock, qp); 769 qp->msg_ngt_cnt--; 770 } 771 772 if (!(error = msgq_check_err(qp, cvres))) { 773 goto findmsg; 774 } 775 776 msgrcv_out: 777 if (error) { 778 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0); 779 if (msg_entry.msgw_snd_wake) { 780 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, 781 msg_entry.msgw_snd_wake); 782 } 783 ipc_rele(msq_svc, (kipc_perm_t *)qp); 784 return ((ssize_t)set_errno(error)); 785 } 786 ipc_rele(msq_svc, (kipc_perm_t *)qp); 787 return ((ssize_t)xtsz); 788 } 789 790 static int 791 msgq_check_err(kmsqid_t *qp, int cvres) 792 { 793 if (IPC_FREE(&qp->msg_perm)) { 794 return (EIDRM); 795 } 796 797 if (cvres == 0) { 798 return (EINTR); 799 } 800 801 return (0); 802 } 803 804 static int 805 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret, 806 size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg) 807 { 808 size_t xtsz; 809 STRUCT_HANDLE(ipcmsgbuf, umsgp); 810 model_t mdl = get_udatamodel(); 811 int copyerror = 0; 812 813 STRUCT_SET_HANDLE(umsgp, mdl, msgp); 814 if (msgsz < smp->msg_size) { 815 if ((msgflg & MSG_NOERROR) == 0) { 816 return (E2BIG); 817 } else { 818 xtsz = msgsz; 819 } 820 } else { 821 xtsz = smp->msg_size; 822 } 823 *xtsz_ret = xtsz; 824 825 /* 826 * To prevent a DOS attack we mark the message as being 827 * copied out and release mutex. When the copy is completed 828 * we need to acquire the mutex and make the appropriate updates. 829 */ 830 ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0); 831 smp->msg_flags |= MSG_RCVCOPY; 832 msg_hold(smp); 833 if (msgtyp < 0) { 834 ASSERT(qp->msg_neg_copy == 0); 835 qp->msg_neg_copy = 1; 836 } 837 mutex_exit(*lock); 838 839 if (mdl == DATAMODEL_NATIVE) { 840 copyerror = copyout(&smp->msg_type, msgp, 841 sizeof (smp->msg_type)); 842 } else { 843 /* 844 * 32-bit callers need an imploded msg type. 845 */ 846 int32_t msg_type32 = smp->msg_type; 847 848 copyerror = copyout(&msg_type32, msgp, 849 sizeof (msg_type32)); 850 } 851 852 if (copyerror == 0 && xtsz) { 853 copyerror = copyout(smp->msg_addr, 854 STRUCT_FADDR(umsgp, mtext), xtsz); 855 } 856 857 /* 858 * Reclaim the mutex and make sure the message queue still exists. 859 */ 860 861 *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id); 862 if (msgtyp < 0) { 863 qp->msg_neg_copy = 0; 864 } 865 ASSERT(smp->msg_flags & MSG_RCVCOPY); 866 smp->msg_flags &= ~MSG_RCVCOPY; 867 msg_rele(smp); 868 if (IPC_FREE(&qp->msg_perm)) { 869 return (EIDRM); 870 } 871 if (copyerror) { 872 return (EFAULT); 873 } 874 qp->msg_lrpid = ttoproc(curthread)->p_pid; 875 qp->msg_rtime = gethrestime_sec(); 876 msgunlink(qp, smp); 877 return (0); 878 } 879 880 static struct msg * 881 msgrcv_lookup(kmsqid_t *qp, long msgtyp) 882 { 883 struct msg *smp = NULL; 884 long qp_low; 885 struct msg *mp; /* ptr to msg on q */ 886 long low_msgtype; 887 static struct msg neg_copy_smp; 888 889 mp = list_head(&qp->msg_list); 890 if (msgtyp == 0) { 891 smp = mp; 892 } else { 893 qp_low = qp->msg_lowest_type; 894 if (msgtyp > 0) { 895 /* 896 * If our lowest possible message type is larger than 897 * the message type desired, then we know there is 898 * no entry present. 899 */ 900 if (qp_low > msgtyp) { 901 return (NULL); 902 } 903 904 for (; mp; mp = list_next(&qp->msg_list, mp)) { 905 if (msgtyp == mp->msg_type) { 906 smp = mp; 907 break; 908 } 909 } 910 } else { 911 /* 912 * We have kept track of the lowest possible message 913 * type on the send queue. This allows us to terminate 914 * the search early if we find a message type of that 915 * type. Note, the lowest type may not be the actual 916 * lowest value in the system, it is only guaranteed 917 * that there isn't a value lower than that. 918 */ 919 low_msgtype = -msgtyp; 920 if (low_msgtype < qp_low) { 921 return (NULL); 922 } 923 if (qp->msg_neg_copy) { 924 neg_copy_smp.msg_flags = MSG_RCVCOPY; 925 return (&neg_copy_smp); 926 } 927 for (; mp; mp = list_next(&qp->msg_list, mp)) { 928 if (mp->msg_type <= low_msgtype && 929 !(smp && smp->msg_type <= mp->msg_type)) { 930 smp = mp; 931 low_msgtype = mp->msg_type; 932 if (low_msgtype == qp_low) { 933 break; 934 } 935 } 936 } 937 if (smp) { 938 /* 939 * Update the lowest message type. 940 */ 941 qp->msg_lowest_type = smp->msg_type; 942 } 943 } 944 } 945 return (smp); 946 } 947 948 /* 949 * msgids system call. 950 */ 951 static int 952 msgids(int *buf, uint_t nids, uint_t *pnids) 953 { 954 int error; 955 956 if (error = ipc_ids(msq_svc, buf, nids, pnids)) 957 return (set_errno(error)); 958 959 return (0); 960 } 961 962 #define RND(x) roundup((x), sizeof (size_t)) 963 #define RND32(x) roundup((x), sizeof (size32_t)) 964 965 /* 966 * msgsnap system call. 967 */ 968 static int 969 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp) 970 { 971 struct msg *mp; /* ptr to msg on q */ 972 kmsqid_t *qp; /* ptr to associated q */ 973 kmutex_t *lock; 974 size_t size; 975 size_t nmsg; 976 struct msg **snaplist; 977 int error, i; 978 model_t mdl = get_udatamodel(); 979 STRUCT_DECL(msgsnap_head, head); 980 STRUCT_DECL(msgsnap_mhead, mhead); 981 982 STRUCT_INIT(head, mdl); 983 STRUCT_INIT(mhead, mdl); 984 985 if (bufsz < STRUCT_SIZE(head)) 986 return (set_errno(EINVAL)); 987 988 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) 989 return (set_errno(EINVAL)); 990 991 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) { 992 mutex_exit(lock); 993 return (set_errno(error)); 994 } 995 ipc_hold(msq_svc, (kipc_perm_t *)qp); 996 997 /* 998 * First compute the required buffer size and 999 * the number of messages on the queue. 1000 */ 1001 size = nmsg = 0; 1002 for (mp = list_head(&qp->msg_list); mp; 1003 mp = list_next(&qp->msg_list, mp)) { 1004 if (msgtyp == 0 || 1005 (msgtyp > 0 && msgtyp == mp->msg_type) || 1006 (msgtyp < 0 && mp->msg_type <= -msgtyp)) { 1007 nmsg++; 1008 if (mdl == DATAMODEL_NATIVE) 1009 size += RND(mp->msg_size); 1010 else 1011 size += RND32(mp->msg_size); 1012 } 1013 } 1014 1015 size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead); 1016 if (size > bufsz) 1017 nmsg = 0; 1018 1019 if (nmsg > 0) { 1020 /* 1021 * Mark the messages as being copied. 1022 */ 1023 snaplist = (struct msg **)kmem_alloc(nmsg * 1024 sizeof (struct msg *), KM_SLEEP); 1025 i = 0; 1026 for (mp = list_head(&qp->msg_list); mp; 1027 mp = list_next(&qp->msg_list, mp)) { 1028 if (msgtyp == 0 || 1029 (msgtyp > 0 && msgtyp == mp->msg_type) || 1030 (msgtyp < 0 && mp->msg_type <= -msgtyp)) { 1031 msg_hold(mp); 1032 snaplist[i] = mp; 1033 i++; 1034 } 1035 } 1036 } 1037 mutex_exit(lock); 1038 1039 /* 1040 * Copy out the buffer header. 1041 */ 1042 STRUCT_FSET(head, msgsnap_size, size); 1043 STRUCT_FSET(head, msgsnap_nmsg, nmsg); 1044 if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head))) 1045 error = EFAULT; 1046 1047 buf += STRUCT_SIZE(head); 1048 1049 /* 1050 * Now copy out the messages one by one. 1051 */ 1052 for (i = 0; i < nmsg; i++) { 1053 mp = snaplist[i]; 1054 if (error == 0) { 1055 STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size); 1056 STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type); 1057 if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead))) 1058 error = EFAULT; 1059 buf += STRUCT_SIZE(mhead); 1060 1061 if (error == 0 && 1062 mp->msg_size != 0 && 1063 copyout(mp->msg_addr, buf, mp->msg_size)) 1064 error = EFAULT; 1065 if (mdl == DATAMODEL_NATIVE) 1066 buf += RND(mp->msg_size); 1067 else 1068 buf += RND32(mp->msg_size); 1069 } 1070 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id); 1071 msg_rele(mp); 1072 /* Check for msg q deleted or reallocated */ 1073 if (IPC_FREE(&qp->msg_perm)) 1074 error = EIDRM; 1075 mutex_exit(lock); 1076 } 1077 1078 (void) ipc_lock(msq_svc, qp->msg_perm.ipc_id); 1079 ipc_rele(msq_svc, (kipc_perm_t *)qp); 1080 1081 if (nmsg > 0) 1082 kmem_free(snaplist, nmsg * sizeof (struct msg *)); 1083 1084 if (error) 1085 return (set_errno(error)); 1086 return (0); 1087 } 1088 1089 #define MSG_PREALLOC_LIMIT 8192 1090 1091 /* 1092 * msgsnd system call. 1093 */ 1094 static int 1095 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg) 1096 { 1097 kmsqid_t *qp; 1098 kmutex_t *lock = NULL; 1099 struct msg *mp = NULL; 1100 long type; 1101 int error = 0, wait_wakeup = 0; 1102 msgq_wakeup_t msg_entry; 1103 model_t mdl = get_udatamodel(); 1104 STRUCT_HANDLE(ipcmsgbuf, umsgp); 1105 1106 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */ 1107 STRUCT_SET_HANDLE(umsgp, mdl, msgp); 1108 1109 if (mdl == DATAMODEL_NATIVE) { 1110 if (copyin(msgp, &type, sizeof (type))) 1111 return (set_errno(EFAULT)); 1112 } else { 1113 int32_t type32; 1114 if (copyin(msgp, &type32, sizeof (type32))) 1115 return (set_errno(EFAULT)); 1116 type = type32; 1117 } 1118 1119 if (type < 1) 1120 return (set_errno(EINVAL)); 1121 1122 /* 1123 * We want the value here large enough that most of the 1124 * the message operations will use the "lockless" path, 1125 * but small enough that a user can not reserve large 1126 * chunks of kernel memory unless they have a valid 1127 * reason to. 1128 */ 1129 if (msgsz <= MSG_PREALLOC_LIMIT) { 1130 /* 1131 * We are small enough that we can afford to do the 1132 * allocation now. This saves dropping the lock 1133 * and then reacquiring the lock. 1134 */ 1135 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP); 1136 mp->msg_copycnt = 1; 1137 mp->msg_size = msgsz; 1138 if (msgsz) { 1139 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP); 1140 if (copyin(STRUCT_FADDR(umsgp, mtext), 1141 mp->msg_addr, msgsz) == -1) { 1142 error = EFAULT; 1143 goto msgsnd_out; 1144 } 1145 } 1146 } 1147 1148 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) { 1149 error = EINVAL; 1150 goto msgsnd_out; 1151 } 1152 1153 ipc_hold(msq_svc, (kipc_perm_t *)qp); 1154 1155 if (msgsz > qp->msg_qbytes) { 1156 error = EINVAL; 1157 goto msgsnd_out; 1158 } 1159 1160 if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED())) 1161 goto msgsnd_out; 1162 1163 top: 1164 /* 1165 * Allocate space on q, message header, & buffer space. 1166 */ 1167 ASSERT(qp->msg_qnum <= qp->msg_qmax); 1168 while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) || 1169 (qp->msg_qnum == qp->msg_qmax)) { 1170 int cvres; 1171 1172 if (msgflg & IPC_NOWAIT) { 1173 error = EAGAIN; 1174 goto msgsnd_out; 1175 } 1176 1177 wait_wakeup = 0; 1178 qp->msg_snd_cnt++; 1179 msg_entry.msgw_snd_size = msgsz; 1180 msg_entry.msgw_thrd = curthread; 1181 msg_entry.msgw_type = type; 1182 cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL); 1183 list_insert_tail(&qp->msg_wait_rcv, &msg_entry); 1184 if (qp->msg_snd_smallest > msgsz) 1185 qp->msg_snd_smallest = msgsz; 1186 cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock); 1187 lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock); 1188 qp->msg_snd_cnt--; 1189 if (list_link_active(&msg_entry.msgw_list)) 1190 list_remove(&qp->msg_wait_rcv, &msg_entry); 1191 if (error = msgq_check_err(qp, cvres)) { 1192 goto msgsnd_out; 1193 } 1194 wait_wakeup = 1; 1195 } 1196 1197 if (mp == NULL) { 1198 int failure; 1199 1200 mutex_exit(lock); 1201 ASSERT(msgsz > 0); 1202 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP); 1203 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP); 1204 mp->msg_size = msgsz; 1205 mp->msg_copycnt = 1; 1206 1207 failure = (copyin(STRUCT_FADDR(umsgp, mtext), 1208 mp->msg_addr, msgsz) == -1); 1209 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id); 1210 if (IPC_FREE(&qp->msg_perm)) { 1211 error = EIDRM; 1212 goto msgsnd_out; 1213 } 1214 if (failure) { 1215 error = EFAULT; 1216 goto msgsnd_out; 1217 } 1218 goto top; 1219 } 1220 1221 /* 1222 * Everything is available, put msg on q. 1223 */ 1224 qp->msg_qnum++; 1225 qp->msg_cbytes += msgsz; 1226 qp->msg_lspid = curproc->p_pid; 1227 qp->msg_stime = gethrestime_sec(); 1228 mp->msg_type = type; 1229 if (qp->msg_lowest_type > type) 1230 qp->msg_lowest_type = type; 1231 list_insert_tail(&qp->msg_list, mp); 1232 /* 1233 * Get the proper receiver going. 1234 */ 1235 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type); 1236 1237 msgsnd_out: 1238 /* 1239 * We were woken up from the send wait list, but an 1240 * an error occured on placing the message onto the 1241 * msg queue. Given that, we need to do the wakeup 1242 * dance again. 1243 */ 1244 1245 if (wait_wakeup && error) { 1246 msg_wakeup_senders(qp); 1247 } 1248 if (lock) 1249 ipc_rele(msq_svc, (kipc_perm_t *)qp); /* drops lock */ 1250 1251 if (error) { 1252 if (mp) 1253 msg_rele(mp); 1254 return (set_errno(error)); 1255 } 1256 1257 return (0); 1258 } 1259 1260 static void 1261 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type) 1262 { 1263 msg_select_t *walker = *flist; 1264 msgq_wakeup_t *wakeup; 1265 uint_t msg_hash; 1266 1267 msg_hash = msg_type_hash(type); 1268 1269 do { 1270 wakeup = walker->selection(qp, msg_hash, type); 1271 walker = walker->next_selection; 1272 } while (!wakeup && walker != *flist); 1273 1274 *flist = (*flist)->next_selection; 1275 if (wakeup) { 1276 if (type) { 1277 wakeup->msgw_snd_wake = type; 1278 } 1279 cv_signal(&wakeup->msgw_wake_cv); 1280 } 1281 } 1282 1283 static uint_t 1284 msg_type_hash(long msg_type) 1285 { 1286 if (msg_type < 0) { 1287 long hash = -msg_type / MSG_NEG_INTERVAL; 1288 /* 1289 * Negative message types are hashed over an 1290 * interval. Any message type that hashes 1291 * beyond MSG_MAX_QNUM is automatically placed 1292 * in the last bucket. 1293 */ 1294 if (hash > MSG_MAX_QNUM) 1295 hash = MSG_MAX_QNUM; 1296 return (hash); 1297 } 1298 1299 /* 1300 * 0 or positive message type. The first bucket is reserved for 1301 * message receivers of type 0, the other buckets we hash into. 1302 */ 1303 if (msg_type) 1304 return (1 + (msg_type % MSG_MAX_QNUM)); 1305 return (0); 1306 } 1307 1308 /* 1309 * Routines to see if we have a receiver of type 0 either blocked waiting 1310 * for a message. Simply return the first guy on the list. 1311 */ 1312 1313 static msgq_wakeup_t * 1314 /* ARGSUSED */ 1315 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type) 1316 { 1317 msgq_wakeup_t *walker; 1318 1319 walker = list_head(&qp->msg_wait_snd[0]); 1320 1321 if (walker) 1322 list_remove(&qp->msg_wait_snd[0], walker); 1323 return (walker); 1324 } 1325 1326 static msgq_wakeup_t * 1327 /* ARGSUSED */ 1328 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type) 1329 { 1330 msgq_wakeup_t *walker; 1331 1332 walker = list_head(&qp->msg_cpy_block); 1333 if (walker) 1334 list_remove(&qp->msg_cpy_block, walker); 1335 return (walker); 1336 } 1337 1338 static msgq_wakeup_t * 1339 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type) 1340 { 1341 msgq_wakeup_t *walker; 1342 1343 walker = list_head(&qp->msg_wait_snd[msg_hash]); 1344 1345 while (walker && walker->msgw_type != type) 1346 walker = list_next(&qp->msg_wait_snd[msg_hash], walker); 1347 if (walker) 1348 list_remove(&qp->msg_wait_snd[msg_hash], walker); 1349 return (walker); 1350 } 1351 1352 /* ARGSUSED */ 1353 static msgq_wakeup_t * 1354 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type) 1355 { 1356 msgq_wakeup_t *qptr; 1357 int count; 1358 int check_index; 1359 int neg_index; 1360 int nbuckets; 1361 1362 if (!qp->msg_ngt_cnt) { 1363 return (NULL); 1364 } 1365 neg_index = msg_type_hash(-type); 1366 1367 /* 1368 * Check for a match among the negative type queues. Any buckets 1369 * at neg_index or larger can match the type. Use the last send 1370 * time to randomize the starting bucket to prevent starvation. 1371 * Search all buckets from neg_index to MSG_MAX_QNUM, starting 1372 * from the random starting point, and wrapping around after 1373 * MSG_MAX_QNUM. 1374 */ 1375 1376 nbuckets = MSG_MAX_QNUM - neg_index + 1; 1377 check_index = neg_index + (qp->msg_stime % nbuckets); 1378 1379 for (count = nbuckets; count > 0; count--) { 1380 qptr = list_head(&qp->msg_wait_snd_ngt[check_index]); 1381 while (qptr) { 1382 /* 1383 * The lowest hash bucket may actually contain 1384 * message types that are not valid for this 1385 * request. This can happen due to the fact that 1386 * the message buckets actually contain a consecutive 1387 * range of types. 1388 */ 1389 if (-qptr->msgw_type >= type) { 1390 list_remove(&qp->msg_wait_snd_ngt[check_index], 1391 qptr); 1392 return (qptr); 1393 } 1394 qptr = list_next(&qp->msg_wait_snd_ngt[check_index], 1395 qptr); 1396 } 1397 if (++check_index > MSG_MAX_QNUM) { 1398 check_index = neg_index; 1399 } 1400 } 1401 return (NULL); 1402 } 1403 1404 static int 1405 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock, 1406 kmsqid_t *qp) 1407 { 1408 int cvres; 1409 1410 cv_init(&entry->msgw_wake_cv, NULL, 0, NULL); 1411 1412 list_insert_tail(queue, entry); 1413 1414 qp->msg_rcv_cnt++; 1415 cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock); 1416 *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock); 1417 qp->msg_rcv_cnt--; 1418 1419 if (list_link_active(&entry->msgw_list)) { 1420 /* 1421 * We woke up unexpectedly, remove ourself. 1422 */ 1423 list_remove(queue, entry); 1424 } 1425 1426 return (cvres); 1427 } 1428 1429 static void 1430 msg_rcvq_wakeup_all(list_t *q_ptr) 1431 { 1432 msgq_wakeup_t *q_walk; 1433 1434 while (q_walk = list_head(q_ptr)) { 1435 list_remove(q_ptr, q_walk); 1436 cv_signal(&q_walk->msgw_wake_cv); 1437 } 1438 } 1439 1440 /* 1441 * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd 1442 * system calls. 1443 */ 1444 static ssize_t 1445 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3, 1446 uintptr_t a4, uintptr_t a5) 1447 { 1448 ssize_t error; 1449 1450 switch (opcode) { 1451 case MSGGET: 1452 error = msgget((key_t)a1, (int)a2); 1453 break; 1454 case MSGCTL: 1455 error = msgctl((int)a1, (int)a2, (void *)a3); 1456 break; 1457 case MSGRCV: 1458 error = msgrcv((int)a1, (struct ipcmsgbuf *)a2, 1459 (size_t)a3, (long)a4, (int)a5); 1460 break; 1461 case MSGSND: 1462 error = msgsnd((int)a1, (struct ipcmsgbuf *)a2, 1463 (size_t)a3, (int)a4); 1464 break; 1465 case MSGIDS: 1466 error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3); 1467 break; 1468 case MSGSNAP: 1469 error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4); 1470 break; 1471 default: 1472 error = set_errno(EINVAL); 1473 break; 1474 } 1475 1476 return (error); 1477 } 1478 1479 /* 1480 * Determine if a writer who is waiting can process its message. If so 1481 * wake it up. 1482 */ 1483 static void 1484 msg_wakeup_senders(kmsqid_t *qp) 1485 1486 { 1487 struct msgq_wakeup *ptr, *optr; 1488 size_t avail, smallest; 1489 int msgs_out; 1490 1491 /* 1492 * Is there a writer waiting, and if so, can it be serviced? If 1493 * not return back to the caller. 1494 */ 1495 if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax) 1496 return; 1497 1498 avail = qp->msg_qbytes - qp->msg_cbytes; 1499 if (avail < qp->msg_snd_smallest) 1500 return; 1501 1502 ptr = list_head(&qp->msg_wait_rcv); 1503 if (ptr == NULL) { 1504 qp->msg_snd_smallest = MSG_SMALL_INIT; 1505 return; 1506 } 1507 optr = ptr; 1508 1509 /* 1510 * smallest: minimum message size of all queued writers 1511 * 1512 * avail: amount of space left on the msgq 1513 * if all the writers we have woken up are successful. 1514 * 1515 * msgs_out: is the number of messages on the message queue if 1516 * all the writers we have woken up are successful. 1517 */ 1518 1519 smallest = MSG_SMALL_INIT; 1520 msgs_out = qp->msg_qnum; 1521 while (ptr) { 1522 ptr = list_next(&qp->msg_wait_rcv, ptr); 1523 if (optr->msgw_snd_size <= avail) { 1524 list_remove(&qp->msg_wait_rcv, optr); 1525 avail -= optr->msgw_snd_size; 1526 cv_signal(&optr->msgw_wake_cv); 1527 msgs_out++; 1528 if (msgs_out == qp->msg_qmax || 1529 avail < qp->msg_snd_smallest) 1530 break; 1531 } else { 1532 if (smallest > optr->msgw_snd_size) 1533 smallest = optr->msgw_snd_size; 1534 } 1535 optr = ptr; 1536 } 1537 1538 /* 1539 * Reset the smallest message size if the entire list has been visited 1540 */ 1541 if (ptr == NULL && smallest != MSG_SMALL_INIT) 1542 qp->msg_snd_smallest = smallest; 1543 } 1544 1545 #ifdef _SYSCALL32_IMPL 1546 /* 1547 * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd 1548 * system calls for 32-bit callers on LP64 kernel. 1549 */ 1550 static ssize32_t 1551 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3, 1552 uint32_t a4, uint32_t a5) 1553 { 1554 ssize_t error; 1555 1556 switch (opcode) { 1557 case MSGGET: 1558 error = msgget((key_t)a1, (int)a2); 1559 break; 1560 case MSGCTL: 1561 error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3); 1562 break; 1563 case MSGRCV: 1564 error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2, 1565 (size_t)a3, (long)(int32_t)a4, (int)a5); 1566 break; 1567 case MSGSND: 1568 error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2, 1569 (size_t)(int32_t)a3, (int)a4); 1570 break; 1571 case MSGIDS: 1572 error = msgids((int *)(uintptr_t)a1, (uint_t)a2, 1573 (uint_t *)(uintptr_t)a3); 1574 break; 1575 case MSGSNAP: 1576 error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3, 1577 (long)(int32_t)a4); 1578 break; 1579 default: 1580 error = set_errno(EINVAL); 1581 break; 1582 } 1583 1584 return (error); 1585 } 1586 #endif /* SYSCALL32_IMPL */