1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2010 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 /* Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T */
27 /* All Rights Reserved */
28
29
30 /*
31 * Inter-Process Communication Message Facility.
32 *
33 * See os/ipc.c for a description of common IPC functionality.
34 *
35 * Resource controls
36 * -----------------
37 *
38 * Control: zone.max-msg-ids (rc_zone_msgmni)
39 * Description: Maximum number of message queue ids allowed a zone.
40 *
41 * When msgget() is used to allocate a message queue, one id is
42 * allocated. If the id allocation doesn't succeed, msgget() fails
43 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
44 * the id is deallocated.
45 *
46 * Control: project.max-msg-ids (rc_project_msgmni)
47 * Description: Maximum number of message queue ids allowed a project.
48 *
49 * When msgget() is used to allocate a message queue, one id is
50 * allocated. If the id allocation doesn't succeed, msgget() fails
51 * and errno is set to ENOSPC. Upon successful msgctl(, IPC_RMID)
52 * the id is deallocated.
53 *
54 * Control: process.max-msg-qbytes (rc_process_msgmnb)
55 * Description: Maximum number of bytes of messages on a message queue.
56 *
57 * When msgget() successfully allocates a message queue, the minimum
58 * enforced value of this limit is used to initialize msg_qbytes.
59 *
60 * Control: process.max-msg-messages (rc_process_msgtql)
61 * Description: Maximum number of messages on a message queue.
62 *
63 * When msgget() successfully allocates a message queue, the minimum
64 * enforced value of this limit is used to initialize a per-queue
65 * limit on the number of messages.
66 */
67
68 #include <sys/types.h>
69 #include <sys/t_lock.h>
70 #include <sys/param.h>
71 #include <sys/cred.h>
72 #include <sys/user.h>
73 #include <sys/proc.h>
74 #include <sys/time.h>
75 #include <sys/ipc.h>
76 #include <sys/ipc_impl.h>
77 #include <sys/msg.h>
78 #include <sys/msg_impl.h>
79 #include <sys/list.h>
80 #include <sys/systm.h>
81 #include <sys/sysmacros.h>
82 #include <sys/cpuvar.h>
83 #include <sys/kmem.h>
84 #include <sys/ddi.h>
85 #include <sys/errno.h>
86 #include <sys/cmn_err.h>
87 #include <sys/debug.h>
88 #include <sys/project.h>
89 #include <sys/modctl.h>
90 #include <sys/syscall.h>
91 #include <sys/policy.h>
92 #include <sys/zone.h>
93
94 #include <c2/audit.h>
95
96 /*
97 * The following tunables are obsolete. Though for compatibility we
98 * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
99 * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
100 * mechanism for administrating the IPC Message facility is through the
101 * resource controls described at the top of this file.
102 */
103 size_t msginfo_msgmax = 2048; /* (obsolete) */
104 size_t msginfo_msgmnb = 4096; /* (obsolete) */
105 int msginfo_msgmni = 50; /* (obsolete) */
106 int msginfo_msgtql = 40; /* (obsolete) */
107 int msginfo_msgssz = 8; /* (obsolete) */
108 int msginfo_msgmap = 0; /* (obsolete) */
109 ushort_t msginfo_msgseg = 1024; /* (obsolete) */
110
111 extern rctl_hndl_t rc_zone_msgmni;
112 extern rctl_hndl_t rc_project_msgmni;
113 extern rctl_hndl_t rc_process_msgmnb;
114 extern rctl_hndl_t rc_process_msgtql;
115 static ipc_service_t *msq_svc;
116 static zone_key_t msg_zone_key;
117
118 static void msg_dtor(kipc_perm_t *);
119 static void msg_rmid(kipc_perm_t *);
120 static void msg_remove_zone(zoneid_t, void *);
121
122 /*
123 * Module linkage information for the kernel.
124 */
125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
126 uintptr_t a4, uintptr_t a5);
127
128 static struct sysent ipcmsg_sysent = {
129 6,
130 #ifdef _LP64
131 SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
132 #else
133 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
134 #endif
135 (int (*)())msgsys
136 };
137
138 #ifdef _SYSCALL32_IMPL
139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
140 uint32_t a4, uint32_t a5);
141
142 static struct sysent ipcmsg_sysent32 = {
143 6,
144 SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
145 (int (*)())msgsys32
146 };
147 #endif /* _SYSCALL32_IMPL */
148
149 static struct modlsys modlsys = {
150 &mod_syscallops, "System V message facility", &ipcmsg_sysent
151 };
152
153 #ifdef _SYSCALL32_IMPL
154 static struct modlsys modlsys32 = {
155 &mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
156 };
157 #endif
158
159 /*
160 * Big Theory statement for message queue correctness
161 *
162 * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
163 * receivers who are waiting for an event. Using the cv_broadcast method
164 * resulted in negative scaling when the number of waiting receivers are large
165 * (the thundering herd problem). Instead, the receivers waiting to receive a
166 * message are now linked in a queue-like fashion and awaken one at a time in
167 * a controlled manner.
168 *
169 * Receivers can block on two different classes of waiting list:
170 * 1) "sendwait" list, which is the more complex list of the two. The
171 * receiver will be awakened by a sender posting a new message. There
172 * are two types of "sendwait" list used:
173 * a) msg_wait_snd: handles all receivers who are looking for
174 * a message type >= 0, but was unable to locate a match.
175 *
176 * slot 0: reserved for receivers that have designated they
177 * will take any message type.
178 * rest: consist of receivers requesting a specific type
179 * but the type was not present. The entries are
180 * hashed into a bucket in an attempt to keep
181 * any list search relatively short.
182 * b) msg_wait_snd_ngt: handles all receivers that have designated
183 * a negative message type. Unlike msg_wait_snd, the hash bucket
184 * serves a range of negative message types (-1 to -5, -6 to -10
185 * and so forth), where the last bucket is reserved for all the
186 * negative message types that hash outside of MSG_MAX_QNUM - 1.
187 * This is done this way to simplify the operation of locating a
188 * negative message type.
189 *
190 * 2) "copyout" list, where the receiver is awakened by another
191 * receiver after a message is copied out. This is a linked list
192 * of waiters that are awakened one at a time. Although the solution is
193 * not optimal, the complexity that would be added in for waking
194 * up the right entry far exceeds any potential pay back (too many
195 * correctness and corner case issues).
196 *
197 * The lists are doubly linked. In the case of the "sendwait"
198 * list, this allows the thread to remove itself from the list without having
199 * to traverse the list. In the case of the "copyout" list it simply allows
200 * us to use common functions with the "sendwait" list.
201 *
202 * To make sure receivers are not hung out to dry, we must guarantee:
203 * 1. If any queued message matches any receiver, then at least one
204 * matching receiver must be processing the request.
205 * 2. Blocking on the copyout queue is only temporary while messages
206 * are being copied out. The process is guaranted to wakeup
207 * when it gets to front of the queue (copyout is a FIFO).
208 *
209 * Rules for blocking and waking up:
210 * 1. A receiver entering msgrcv must examine all messages for a match
211 * before blocking on a sendwait queue.
212 * 2. If the receiver blocks because the message it chose is already
213 * being copied out, then when it wakes up needs to start start
214 * checking the messages from the beginning.
215 * 3) When ever a process returns from msgrcv for any reason, if it
216 * had attempted to copy a message or blocked waiting for a copy
217 * to complete it needs to wakeup the next receiver blocked on
218 * a copy out.
219 * 4) When a message is sent, the sender selects a process waiting
220 * for that type of message. This selection process rotates between
221 * receivers types of 0, negative and positive to prevent starvation of
222 * any one particular receiver type.
223 * 5) The following are the scenarios for processes that are awakened
224 * by a msgsnd:
225 * a) The process finds the message and is able to copy
226 * it out. Once complete, the process returns.
227 * b) The message that was sent that triggered the wakeup is no
228 * longer available (another process found the message first).
229 * We issue a wakeup on copy queue and then go back to
230 * sleep waiting for another matching message to be sent.
231 * c) The message that was supposed to be processed was
232 * already serviced by another process. However a different
233 * message is present which we can service. The message
234 * is copied and the process returns.
235 * d) The message is found, but some sort of error occurs that
236 * prevents the message from being copied. The receiver
237 * wakes up the next sender that can service this message
238 * type and returns an error to the caller.
239 * e) The message is found, but it is marked as being copied
240 * out. The receiver then goes to sleep on the copyout
241 * queue where it will be awakened again sometime in the future.
242 *
243 *
244 * 6) Whenever a message is found that matches the message type designated,
245 * but is being copied out we have to block on the copyout queue.
246 * After process copying finishes the copy out, it must wakeup (either
247 * directly or indirectly) all receivers who blocked on its copyout,
248 * so they are guaranteed a chance to examine the remaining messages.
249 * This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
250 * and so on. The chain cannot be broken. This leads to the following
251 * cases:
252 * a) A receiver is finished copying the message (or encountered)
253 * an error), the first entry on the copyout queue is woken
254 * up.
255 * b) When the receiver is woken up, it attempts to locate
256 * a message type match.
257 * c) If a message type is found and
258 * -- MSG_RCVCOPY flag is not set, the message is
259 * marked for copying out. Regardless of the copyout
260 * success the next entry on the copyout queue is
261 * awakened and the operation is completed.
262 * -- MSG_RCVCOPY is set, we simply go back to sleep again
263 * on the copyout queue.
264 * d) If the message type is not found then we wakeup the next
265 * process on the copyout queue.
266 * 7) If a msgsnd is unable to complete for of any of the following reasons
267 * a) the msgq has no space for the message
268 * b) the maximum number of messages allowed has been reached
269 * then one of two things happen:
270 * 1) If the passed in msg_flag has IPC_NOWAIT set, then
271 * an error is returned.
272 * 2) The IPC_NOWAIT bit is not set in msg_flag, then the
273 * the thread is placed to sleep until the request can be
274 * serviced.
275 * 8) When waking a thread waiting to send a message, a check is done to
276 * verify that the operation being asked for by the thread will complete.
277 * This decision making process is done in a loop where the oldest request
278 * is checked first. The search will continue until there is no more
279 * room on the msgq or we have checked all the waiters.
280 */
281
282 static uint_t msg_type_hash(long);
283 static int msgq_check_err(kmsqid_t *qp, int cvres);
284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
285 kmsqid_t *);
286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
287 struct msg *, struct ipcmsgbuf *, int);
288 static void msg_rcvq_wakeup_all(list_t *);
289 static void msg_wakeup_senders(kmsqid_t *);
290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
295 static struct msg *msgrcv_lookup(kmsqid_t *, long);
296
297 msg_select_t msg_fnd_sndr[] = {
298 { msg_fnd_any_snd, &msg_fnd_sndr[1] },
299 { msg_fnd_spc_snd, &msg_fnd_sndr[2] },
300 { msg_fnd_neg_snd, &msg_fnd_sndr[0] }
301 };
302
303 msg_select_t msg_fnd_rdr[1] = {
304 { msg_fnd_any_rdr, &msg_fnd_rdr[0] },
305 };
306
307 static struct modlinkage modlinkage = {
308 MODREV_1,
309 { &modlsys,
310 #ifdef _SYSCALL32_IMPL
311 &modlsys32,
312 #endif
313 NULL
314 }
315 };
316
317 #define MSG_SMALL_INIT (size_t)-1
318 int
319 _init(void)
320 {
321 int result;
322
323 msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
324 sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
325 offsetof(ipc_rqty_t, ipcq_msgmni));
326 zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
327
328 if ((result = mod_install(&modlinkage)) == 0)
329 return (0);
330
331 (void) zone_key_delete(msg_zone_key);
332 ipcs_destroy(msq_svc);
333
334 return (result);
335 }
336
337 int
338 _fini(void)
339 {
340 return (EBUSY);
341 }
342
343 int
344 _info(struct modinfo *modinfop)
345 {
346 return (mod_info(&modlinkage, modinfop));
347 }
348
349 static void
350 msg_dtor(kipc_perm_t *perm)
351 {
352 kmsqid_t *qp = (kmsqid_t *)perm;
353 int ii;
354
355 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
356 ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
357 ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
358 list_destroy(&qp->msg_wait_snd[ii]);
359 list_destroy(&qp->msg_wait_snd_ngt[ii]);
360 }
361 ASSERT(list_is_empty(&qp->msg_cpy_block));
362 ASSERT(list_is_empty(&qp->msg_wait_rcv));
363 list_destroy(&qp->msg_cpy_block);
364 ASSERT(qp->msg_snd_cnt == 0);
365 ASSERT(qp->msg_cbytes == 0);
366 list_destroy(&qp->msg_list);
367 list_destroy(&qp->msg_wait_rcv);
368 }
369
370
371 #define msg_hold(mp) (mp)->msg_copycnt++
372
373 /*
374 * msg_rele - decrement the reference count on the message. When count
375 * reaches zero, free message header and contents.
376 */
377 static void
378 msg_rele(struct msg *mp)
379 {
380 ASSERT(mp->msg_copycnt > 0);
381 if (mp->msg_copycnt-- == 1) {
382 if (mp->msg_addr)
383 kmem_free(mp->msg_addr, mp->msg_size);
384 kmem_free(mp, sizeof (struct msg));
385 }
386 }
387
388 /*
389 * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
390 * waiting for free bytes on queue.
391 *
392 * Called with queue locked.
393 */
394 static void
395 msgunlink(kmsqid_t *qp, struct msg *mp)
396 {
397 list_remove(&qp->msg_list, mp);
398 qp->msg_qnum--;
399 qp->msg_cbytes -= mp->msg_size;
400 msg_rele(mp);
401
402 /* Wake up waiting writers */
403 msg_wakeup_senders(qp);
404 }
405
406 static void
407 msg_rmid(kipc_perm_t *perm)
408 {
409 kmsqid_t *qp = (kmsqid_t *)perm;
410 struct msg *mp;
411 int ii;
412
413
414 while ((mp = list_head(&qp->msg_list)) != NULL)
415 msgunlink(qp, mp);
416 ASSERT(qp->msg_cbytes == 0);
417
418 /*
419 * Wake up everyone who is in a wait state of some sort
420 * for this message queue.
421 */
422 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
423 msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
424 msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
425 }
426 msg_rcvq_wakeup_all(&qp->msg_cpy_block);
427 msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
428 }
429
430 /*
431 * msgctl system call.
432 *
433 * gets q lock (via ipc_lookup), releases before return.
434 * may call users of msg_lock
435 */
436 static int
437 msgctl(int msgid, int cmd, void *arg)
438 {
439 STRUCT_DECL(msqid_ds, ds); /* SVR4 queue work area */
440 kmsqid_t *qp; /* ptr to associated q */
441 int error;
442 struct cred *cr;
443 model_t mdl = get_udatamodel();
444 struct msqid_ds64 ds64;
445 kmutex_t *lock;
446 proc_t *pp = curproc;
447
448 STRUCT_INIT(ds, mdl);
449 cr = CRED();
450
451 /*
452 * Perform pre- or non-lookup actions (e.g. copyins, RMID).
453 */
454 switch (cmd) {
455 case IPC_SET:
456 if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
457 return (set_errno(EFAULT));
458 break;
459
460 case IPC_SET64:
461 if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
462 return (set_errno(EFAULT));
463 break;
464
465 case IPC_RMID:
466 if (error = ipc_rmid(msq_svc, msgid, cr))
467 return (set_errno(error));
468 return (0);
469 }
470
471 /*
472 * get msqid_ds for this msgid
473 */
474 if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
475 return (set_errno(EINVAL));
476
477 switch (cmd) {
478 case IPC_SET:
479 if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
480 secpolicy_ipc_config(cr) != 0) {
481 mutex_exit(lock);
482 return (set_errno(EPERM));
483 }
484 if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
485 &STRUCT_BUF(ds)->msg_perm, mdl)) {
486 mutex_exit(lock);
487 return (set_errno(error));
488 }
489 qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
490 qp->msg_ctime = gethrestime_sec();
491 break;
492
493 case IPC_STAT:
494 if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
495 mutex_exit(lock);
496 return (set_errno(error));
497 }
498
499 if (qp->msg_rcv_cnt)
500 qp->msg_perm.ipc_mode |= MSG_RWAIT;
501 if (qp->msg_snd_cnt)
502 qp->msg_perm.ipc_mode |= MSG_WWAIT;
503 ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
504 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
505 STRUCT_FSETP(ds, msg_first, NULL); /* kernel addr */
506 STRUCT_FSETP(ds, msg_last, NULL);
507 STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
508 STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
509 STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
510 STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
511 STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
512 STRUCT_FSET(ds, msg_stime, qp->msg_stime);
513 STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
514 STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
515 break;
516
517 case IPC_SET64:
518 mutex_enter(&pp->p_lock);
519 if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
520 secpolicy_ipc_config(cr) != 0 &&
521 rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
522 ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
523 mutex_exit(&pp->p_lock);
524 mutex_exit(lock);
525 return (set_errno(EPERM));
526 }
527 mutex_exit(&pp->p_lock);
528 if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
529 &ds64.msgx_perm)) {
530 mutex_exit(lock);
531 return (set_errno(error));
532 }
533 qp->msg_qbytes = ds64.msgx_qbytes;
534 qp->msg_ctime = gethrestime_sec();
535 break;
536
537 case IPC_STAT64:
538 if (qp->msg_rcv_cnt)
539 qp->msg_perm.ipc_mode |= MSG_RWAIT;
540 if (qp->msg_snd_cnt)
541 qp->msg_perm.ipc_mode |= MSG_WWAIT;
542 ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
543 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
544 ds64.msgx_cbytes = qp->msg_cbytes;
545 ds64.msgx_qnum = qp->msg_qnum;
546 ds64.msgx_qbytes = qp->msg_qbytes;
547 ds64.msgx_lspid = qp->msg_lspid;
548 ds64.msgx_lrpid = qp->msg_lrpid;
549 ds64.msgx_stime = qp->msg_stime;
550 ds64.msgx_rtime = qp->msg_rtime;
551 ds64.msgx_ctime = qp->msg_ctime;
552 break;
553
554 default:
555 mutex_exit(lock);
556 return (set_errno(EINVAL));
557 }
558
559 mutex_exit(lock);
560
561 /*
562 * Do copyout last (after releasing mutex).
563 */
564 switch (cmd) {
565 case IPC_STAT:
566 if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
567 return (set_errno(EFAULT));
568 break;
569
570 case IPC_STAT64:
571 if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
572 return (set_errno(EFAULT));
573 break;
574 }
575
576 return (0);
577 }
578
579 /*
580 * Remove all message queues associated with a given zone. Called by
581 * zone_shutdown when the zone is halted.
582 */
583 /*ARGSUSED1*/
584 static void
585 msg_remove_zone(zoneid_t zoneid, void *arg)
586 {
587 ipc_remove_zone(msq_svc, zoneid);
588 }
589
590 /*
591 * msgget system call.
592 */
593 static int
594 msgget(key_t key, int msgflg)
595 {
596 kmsqid_t *qp;
597 kmutex_t *lock;
598 int id, error;
599 int ii;
600 proc_t *pp = curproc;
601
602 top:
603 if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
604 return (set_errno(error));
605
606 if (IPC_FREE(&qp->msg_perm)) {
607 mutex_exit(lock);
608 mutex_exit(&pp->p_lock);
609
610 list_create(&qp->msg_list, sizeof (struct msg),
611 offsetof(struct msg, msg_node));
612 qp->msg_qnum = 0;
613 qp->msg_lspid = qp->msg_lrpid = 0;
614 qp->msg_stime = qp->msg_rtime = 0;
615 qp->msg_ctime = gethrestime_sec();
616 qp->msg_ngt_cnt = 0;
617 qp->msg_neg_copy = 0;
618 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
619 list_create(&qp->msg_wait_snd[ii],
620 sizeof (msgq_wakeup_t),
621 offsetof(msgq_wakeup_t, msgw_list));
622 list_create(&qp->msg_wait_snd_ngt[ii],
623 sizeof (msgq_wakeup_t),
624 offsetof(msgq_wakeup_t, msgw_list));
625 }
626 /*
627 * The proper initialization of msg_lowest_type is to the
628 * highest possible value. By doing this we guarantee that
629 * when the first send happens, the lowest type will be set
630 * properly.
631 */
632 qp->msg_lowest_type = MSG_SMALL_INIT;
633 list_create(&qp->msg_cpy_block,
634 sizeof (msgq_wakeup_t),
635 offsetof(msgq_wakeup_t, msgw_list));
636 list_create(&qp->msg_wait_rcv,
637 sizeof (msgq_wakeup_t),
638 offsetof(msgq_wakeup_t, msgw_list));
639 qp->msg_fnd_sndr = &msg_fnd_sndr[0];
640 qp->msg_fnd_rdr = &msg_fnd_rdr[0];
641 qp->msg_rcv_cnt = 0;
642 qp->msg_snd_cnt = 0;
643 qp->msg_snd_smallest = MSG_SMALL_INIT;
644
645 if (error = ipc_commit_begin(msq_svc, key, msgflg,
646 (kipc_perm_t *)qp)) {
647 if (error == EAGAIN)
648 goto top;
649 return (set_errno(error));
650 }
651 qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
652 pp->p_rctls, pp);
653 qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
654 pp->p_rctls, pp);
655 lock = ipc_commit_end(msq_svc, &qp->msg_perm);
656 }
657
658 if (AU_AUDITING())
659 audit_ipcget(AT_IPC_MSG, (void *)qp);
660
661 id = qp->msg_perm.ipc_id;
662 mutex_exit(lock);
663 return (id);
664 }
665
666 static ssize_t
667 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
668 {
669 struct msg *smp; /* ptr to best msg on q */
670 kmsqid_t *qp; /* ptr to associated q */
671 kmutex_t *lock;
672 size_t xtsz; /* transfer byte count */
673 int error = 0;
674 int cvres;
675 uint_t msg_hash;
676 msgq_wakeup_t msg_entry;
677
678 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
679
680 msg_hash = msg_type_hash(msgtyp);
681 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
682 return ((ssize_t)set_errno(EINVAL));
683 }
684 ipc_hold(msq_svc, (kipc_perm_t *)qp);
685
686 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
687 goto msgrcv_out;
688 }
689
690 /*
691 * Various information (including the condvar_t) required for the
692 * process to sleep is provided by it's stack.
693 */
694 msg_entry.msgw_thrd = curthread;
695 msg_entry.msgw_snd_wake = 0;
696 msg_entry.msgw_type = msgtyp;
697 findmsg:
698 smp = msgrcv_lookup(qp, msgtyp);
699
700 if (smp) {
701 /*
702 * We found a possible message to copy out.
703 */
704 if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
705 long t = msg_entry.msgw_snd_wake;
706 long copy_type = smp->msg_type;
707
708 /*
709 * It is available, attempt to copy it.
710 */
711 error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
712 smp, msgp, msgflg);
713
714 /*
715 * It is possible to consume a different message
716 * type then what originally awakened for (negative
717 * types). If this happens a check must be done to
718 * to determine if another receiver is available
719 * for the waking message type, Failure to do this
720 * can result in a message on the queue that can be
721 * serviced by a sleeping receiver.
722 */
723 if (!error && t && (copy_type != t))
724 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
725
726 /*
727 * Don't forget to wakeup a sleeper that blocked because
728 * we were copying things out.
729 */
730 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
731 goto msgrcv_out;
732 }
733 /*
734 * The selected message is being copied out, so block. We do
735 * not need to wake the next person up on the msg_cpy_block list
736 * due to the fact some one is copying out and they will get
737 * things moving again once the copy is completed.
738 */
739 cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
740 &msg_entry, &lock, qp);
741 error = msgq_check_err(qp, cvres);
742 if (error) {
743 goto msgrcv_out;
744 }
745 goto findmsg;
746 }
747 /*
748 * There isn't a message to copy out that matches the designated
749 * criteria.
750 */
751 if (msgflg & IPC_NOWAIT) {
752 error = ENOMSG;
753 goto msgrcv_out;
754 }
755 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
756
757 /*
758 * Wait for new message. We keep the negative and positive types
759 * separate for performance reasons.
760 */
761 msg_entry.msgw_snd_wake = 0;
762 if (msgtyp >= 0) {
763 cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
764 &msg_entry, &lock, qp);
765 } else {
766 qp->msg_ngt_cnt++;
767 cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
768 &msg_entry, &lock, qp);
769 qp->msg_ngt_cnt--;
770 }
771
772 if (!(error = msgq_check_err(qp, cvres))) {
773 goto findmsg;
774 }
775
776 msgrcv_out:
777 if (error) {
778 msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
779 if (msg_entry.msgw_snd_wake) {
780 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
781 msg_entry.msgw_snd_wake);
782 }
783 ipc_rele(msq_svc, (kipc_perm_t *)qp);
784 return ((ssize_t)set_errno(error));
785 }
786 ipc_rele(msq_svc, (kipc_perm_t *)qp);
787 return ((ssize_t)xtsz);
788 }
789
790 static int
791 msgq_check_err(kmsqid_t *qp, int cvres)
792 {
793 if (IPC_FREE(&qp->msg_perm)) {
794 return (EIDRM);
795 }
796
797 if (cvres == 0) {
798 return (EINTR);
799 }
800
801 return (0);
802 }
803
804 static int
805 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
806 size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
807 {
808 size_t xtsz;
809 STRUCT_HANDLE(ipcmsgbuf, umsgp);
810 model_t mdl = get_udatamodel();
811 int copyerror = 0;
812
813 STRUCT_SET_HANDLE(umsgp, mdl, msgp);
814 if (msgsz < smp->msg_size) {
815 if ((msgflg & MSG_NOERROR) == 0) {
816 return (E2BIG);
817 } else {
818 xtsz = msgsz;
819 }
820 } else {
821 xtsz = smp->msg_size;
822 }
823 *xtsz_ret = xtsz;
824
825 /*
826 * To prevent a DOS attack we mark the message as being
827 * copied out and release mutex. When the copy is completed
828 * we need to acquire the mutex and make the appropriate updates.
829 */
830 ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
831 smp->msg_flags |= MSG_RCVCOPY;
832 msg_hold(smp);
833 if (msgtyp < 0) {
834 ASSERT(qp->msg_neg_copy == 0);
835 qp->msg_neg_copy = 1;
836 }
837 mutex_exit(*lock);
838
839 if (mdl == DATAMODEL_NATIVE) {
840 copyerror = copyout(&smp->msg_type, msgp,
841 sizeof (smp->msg_type));
842 } else {
843 /*
844 * 32-bit callers need an imploded msg type.
845 */
846 int32_t msg_type32 = smp->msg_type;
847
848 copyerror = copyout(&msg_type32, msgp,
849 sizeof (msg_type32));
850 }
851
852 if (copyerror == 0 && xtsz) {
853 copyerror = copyout(smp->msg_addr,
854 STRUCT_FADDR(umsgp, mtext), xtsz);
855 }
856
857 /*
858 * Reclaim the mutex and make sure the message queue still exists.
859 */
860
861 *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
862 if (msgtyp < 0) {
863 qp->msg_neg_copy = 0;
864 }
865 ASSERT(smp->msg_flags & MSG_RCVCOPY);
866 smp->msg_flags &= ~MSG_RCVCOPY;
867 msg_rele(smp);
868 if (IPC_FREE(&qp->msg_perm)) {
869 return (EIDRM);
870 }
871 if (copyerror) {
872 return (EFAULT);
873 }
874 qp->msg_lrpid = ttoproc(curthread)->p_pid;
875 qp->msg_rtime = gethrestime_sec();
876 msgunlink(qp, smp);
877 return (0);
878 }
879
880 static struct msg *
881 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
882 {
883 struct msg *smp = NULL;
884 long qp_low;
885 struct msg *mp; /* ptr to msg on q */
886 long low_msgtype;
887 static struct msg neg_copy_smp;
888
889 mp = list_head(&qp->msg_list);
890 if (msgtyp == 0) {
891 smp = mp;
892 } else {
893 qp_low = qp->msg_lowest_type;
894 if (msgtyp > 0) {
895 /*
896 * If our lowest possible message type is larger than
897 * the message type desired, then we know there is
898 * no entry present.
899 */
900 if (qp_low > msgtyp) {
901 return (NULL);
902 }
903
904 for (; mp; mp = list_next(&qp->msg_list, mp)) {
905 if (msgtyp == mp->msg_type) {
906 smp = mp;
907 break;
908 }
909 }
910 } else {
911 /*
912 * We have kept track of the lowest possible message
913 * type on the send queue. This allows us to terminate
914 * the search early if we find a message type of that
915 * type. Note, the lowest type may not be the actual
916 * lowest value in the system, it is only guaranteed
917 * that there isn't a value lower than that.
918 */
919 low_msgtype = -msgtyp;
920 if (low_msgtype < qp_low) {
921 return (NULL);
922 }
923 if (qp->msg_neg_copy) {
924 neg_copy_smp.msg_flags = MSG_RCVCOPY;
925 return (&neg_copy_smp);
926 }
927 for (; mp; mp = list_next(&qp->msg_list, mp)) {
928 if (mp->msg_type <= low_msgtype &&
929 !(smp && smp->msg_type <= mp->msg_type)) {
930 smp = mp;
931 low_msgtype = mp->msg_type;
932 if (low_msgtype == qp_low) {
933 break;
934 }
935 }
936 }
937 if (smp) {
938 /*
939 * Update the lowest message type.
940 */
941 qp->msg_lowest_type = smp->msg_type;
942 }
943 }
944 }
945 return (smp);
946 }
947
948 /*
949 * msgids system call.
950 */
951 static int
952 msgids(int *buf, uint_t nids, uint_t *pnids)
953 {
954 int error;
955
956 if (error = ipc_ids(msq_svc, buf, nids, pnids))
957 return (set_errno(error));
958
959 return (0);
960 }
961
962 #define RND(x) roundup((x), sizeof (size_t))
963 #define RND32(x) roundup((x), sizeof (size32_t))
964
965 /*
966 * msgsnap system call.
967 */
968 static int
969 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
970 {
971 struct msg *mp; /* ptr to msg on q */
972 kmsqid_t *qp; /* ptr to associated q */
973 kmutex_t *lock;
974 size_t size;
975 size_t nmsg;
976 struct msg **snaplist;
977 int error, i;
978 model_t mdl = get_udatamodel();
979 STRUCT_DECL(msgsnap_head, head);
980 STRUCT_DECL(msgsnap_mhead, mhead);
981
982 STRUCT_INIT(head, mdl);
983 STRUCT_INIT(mhead, mdl);
984
985 if (bufsz < STRUCT_SIZE(head))
986 return (set_errno(EINVAL));
987
988 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
989 return (set_errno(EINVAL));
990
991 if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
992 mutex_exit(lock);
993 return (set_errno(error));
994 }
995 ipc_hold(msq_svc, (kipc_perm_t *)qp);
996
997 /*
998 * First compute the required buffer size and
999 * the number of messages on the queue.
1000 */
1001 size = nmsg = 0;
1002 for (mp = list_head(&qp->msg_list); mp;
1003 mp = list_next(&qp->msg_list, mp)) {
1004 if (msgtyp == 0 ||
1005 (msgtyp > 0 && msgtyp == mp->msg_type) ||
1006 (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1007 nmsg++;
1008 if (mdl == DATAMODEL_NATIVE)
1009 size += RND(mp->msg_size);
1010 else
1011 size += RND32(mp->msg_size);
1012 }
1013 }
1014
1015 size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1016 if (size > bufsz)
1017 nmsg = 0;
1018
1019 if (nmsg > 0) {
1020 /*
1021 * Mark the messages as being copied.
1022 */
1023 snaplist = (struct msg **)kmem_alloc(nmsg *
1024 sizeof (struct msg *), KM_SLEEP);
1025 i = 0;
1026 for (mp = list_head(&qp->msg_list); mp;
1027 mp = list_next(&qp->msg_list, mp)) {
1028 if (msgtyp == 0 ||
1029 (msgtyp > 0 && msgtyp == mp->msg_type) ||
1030 (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1031 msg_hold(mp);
1032 snaplist[i] = mp;
1033 i++;
1034 }
1035 }
1036 }
1037 mutex_exit(lock);
1038
1039 /*
1040 * Copy out the buffer header.
1041 */
1042 STRUCT_FSET(head, msgsnap_size, size);
1043 STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1044 if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1045 error = EFAULT;
1046
1047 buf += STRUCT_SIZE(head);
1048
1049 /*
1050 * Now copy out the messages one by one.
1051 */
1052 for (i = 0; i < nmsg; i++) {
1053 mp = snaplist[i];
1054 if (error == 0) {
1055 STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1056 STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1057 if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1058 error = EFAULT;
1059 buf += STRUCT_SIZE(mhead);
1060
1061 if (error == 0 &&
1062 mp->msg_size != 0 &&
1063 copyout(mp->msg_addr, buf, mp->msg_size))
1064 error = EFAULT;
1065 if (mdl == DATAMODEL_NATIVE)
1066 buf += RND(mp->msg_size);
1067 else
1068 buf += RND32(mp->msg_size);
1069 }
1070 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1071 msg_rele(mp);
1072 /* Check for msg q deleted or reallocated */
1073 if (IPC_FREE(&qp->msg_perm))
1074 error = EIDRM;
1075 mutex_exit(lock);
1076 }
1077
1078 (void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1079 ipc_rele(msq_svc, (kipc_perm_t *)qp);
1080
1081 if (nmsg > 0)
1082 kmem_free(snaplist, nmsg * sizeof (struct msg *));
1083
1084 if (error)
1085 return (set_errno(error));
1086 return (0);
1087 }
1088
1089 #define MSG_PREALLOC_LIMIT 8192
1090
1091 /*
1092 * msgsnd system call.
1093 */
1094 static int
1095 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1096 {
1097 kmsqid_t *qp;
1098 kmutex_t *lock = NULL;
1099 struct msg *mp = NULL;
1100 long type;
1101 int error = 0, wait_wakeup = 0;
1102 msgq_wakeup_t msg_entry;
1103 model_t mdl = get_udatamodel();
1104 STRUCT_HANDLE(ipcmsgbuf, umsgp);
1105
1106 CPU_STATS_ADDQ(CPU, sys, msg, 1); /* bump msg send/rcv count */
1107 STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1108
1109 if (mdl == DATAMODEL_NATIVE) {
1110 if (copyin(msgp, &type, sizeof (type)))
1111 return (set_errno(EFAULT));
1112 } else {
1113 int32_t type32;
1114 if (copyin(msgp, &type32, sizeof (type32)))
1115 return (set_errno(EFAULT));
1116 type = type32;
1117 }
1118
1119 if (type < 1)
1120 return (set_errno(EINVAL));
1121
1122 /*
1123 * We want the value here large enough that most of the
1124 * the message operations will use the "lockless" path,
1125 * but small enough that a user can not reserve large
1126 * chunks of kernel memory unless they have a valid
1127 * reason to.
1128 */
1129 if (msgsz <= MSG_PREALLOC_LIMIT) {
1130 /*
1131 * We are small enough that we can afford to do the
1132 * allocation now. This saves dropping the lock
1133 * and then reacquiring the lock.
1134 */
1135 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1136 mp->msg_copycnt = 1;
1137 mp->msg_size = msgsz;
1138 if (msgsz) {
1139 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1140 if (copyin(STRUCT_FADDR(umsgp, mtext),
1141 mp->msg_addr, msgsz) == -1) {
1142 error = EFAULT;
1143 goto msgsnd_out;
1144 }
1145 }
1146 }
1147
1148 if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1149 error = EINVAL;
1150 goto msgsnd_out;
1151 }
1152
1153 ipc_hold(msq_svc, (kipc_perm_t *)qp);
1154
1155 if (msgsz > qp->msg_qbytes) {
1156 error = EINVAL;
1157 goto msgsnd_out;
1158 }
1159
1160 if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1161 goto msgsnd_out;
1162
1163 top:
1164 /*
1165 * Allocate space on q, message header, & buffer space.
1166 */
1167 ASSERT(qp->msg_qnum <= qp->msg_qmax);
1168 while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1169 (qp->msg_qnum == qp->msg_qmax)) {
1170 int cvres;
1171
1172 if (msgflg & IPC_NOWAIT) {
1173 error = EAGAIN;
1174 goto msgsnd_out;
1175 }
1176
1177 wait_wakeup = 0;
1178 qp->msg_snd_cnt++;
1179 msg_entry.msgw_snd_size = msgsz;
1180 msg_entry.msgw_thrd = curthread;
1181 msg_entry.msgw_type = type;
1182 cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1183 list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1184 if (qp->msg_snd_smallest > msgsz)
1185 qp->msg_snd_smallest = msgsz;
1186 cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1187 lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1188 qp->msg_snd_cnt--;
1189 if (list_link_active(&msg_entry.msgw_list))
1190 list_remove(&qp->msg_wait_rcv, &msg_entry);
1191 if (error = msgq_check_err(qp, cvres)) {
1192 goto msgsnd_out;
1193 }
1194 wait_wakeup = 1;
1195 }
1196
1197 if (mp == NULL) {
1198 int failure;
1199
1200 mutex_exit(lock);
1201 ASSERT(msgsz > 0);
1202 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1203 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1204 mp->msg_size = msgsz;
1205 mp->msg_copycnt = 1;
1206
1207 failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1208 mp->msg_addr, msgsz) == -1);
1209 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1210 if (IPC_FREE(&qp->msg_perm)) {
1211 error = EIDRM;
1212 goto msgsnd_out;
1213 }
1214 if (failure) {
1215 error = EFAULT;
1216 goto msgsnd_out;
1217 }
1218 goto top;
1219 }
1220
1221 /*
1222 * Everything is available, put msg on q.
1223 */
1224 qp->msg_qnum++;
1225 qp->msg_cbytes += msgsz;
1226 qp->msg_lspid = curproc->p_pid;
1227 qp->msg_stime = gethrestime_sec();
1228 mp->msg_type = type;
1229 if (qp->msg_lowest_type > type)
1230 qp->msg_lowest_type = type;
1231 list_insert_tail(&qp->msg_list, mp);
1232 /*
1233 * Get the proper receiver going.
1234 */
1235 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1236
1237 msgsnd_out:
1238 /*
1239 * We were woken up from the send wait list, but an
1240 * an error occured on placing the message onto the
1241 * msg queue. Given that, we need to do the wakeup
1242 * dance again.
1243 */
1244
1245 if (wait_wakeup && error) {
1246 msg_wakeup_senders(qp);
1247 }
1248 if (lock)
1249 ipc_rele(msq_svc, (kipc_perm_t *)qp); /* drops lock */
1250
1251 if (error) {
1252 if (mp)
1253 msg_rele(mp);
1254 return (set_errno(error));
1255 }
1256
1257 return (0);
1258 }
1259
1260 static void
1261 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1262 {
1263 msg_select_t *walker = *flist;
1264 msgq_wakeup_t *wakeup;
1265 uint_t msg_hash;
1266
1267 msg_hash = msg_type_hash(type);
1268
1269 do {
1270 wakeup = walker->selection(qp, msg_hash, type);
1271 walker = walker->next_selection;
1272 } while (!wakeup && walker != *flist);
1273
1274 *flist = (*flist)->next_selection;
1275 if (wakeup) {
1276 if (type) {
1277 wakeup->msgw_snd_wake = type;
1278 }
1279 cv_signal(&wakeup->msgw_wake_cv);
1280 }
1281 }
1282
1283 static uint_t
1284 msg_type_hash(long msg_type)
1285 {
1286 if (msg_type < 0) {
1287 long hash = -msg_type / MSG_NEG_INTERVAL;
1288 /*
1289 * Negative message types are hashed over an
1290 * interval. Any message type that hashes
1291 * beyond MSG_MAX_QNUM is automatically placed
1292 * in the last bucket.
1293 */
1294 if (hash > MSG_MAX_QNUM)
1295 hash = MSG_MAX_QNUM;
1296 return (hash);
1297 }
1298
1299 /*
1300 * 0 or positive message type. The first bucket is reserved for
1301 * message receivers of type 0, the other buckets we hash into.
1302 */
1303 if (msg_type)
1304 return (1 + (msg_type % MSG_MAX_QNUM));
1305 return (0);
1306 }
1307
1308 /*
1309 * Routines to see if we have a receiver of type 0 either blocked waiting
1310 * for a message. Simply return the first guy on the list.
1311 */
1312
1313 static msgq_wakeup_t *
1314 /* ARGSUSED */
1315 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1316 {
1317 msgq_wakeup_t *walker;
1318
1319 walker = list_head(&qp->msg_wait_snd[0]);
1320
1321 if (walker)
1322 list_remove(&qp->msg_wait_snd[0], walker);
1323 return (walker);
1324 }
1325
1326 static msgq_wakeup_t *
1327 /* ARGSUSED */
1328 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1329 {
1330 msgq_wakeup_t *walker;
1331
1332 walker = list_head(&qp->msg_cpy_block);
1333 if (walker)
1334 list_remove(&qp->msg_cpy_block, walker);
1335 return (walker);
1336 }
1337
1338 static msgq_wakeup_t *
1339 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1340 {
1341 msgq_wakeup_t *walker;
1342
1343 walker = list_head(&qp->msg_wait_snd[msg_hash]);
1344
1345 while (walker && walker->msgw_type != type)
1346 walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1347 if (walker)
1348 list_remove(&qp->msg_wait_snd[msg_hash], walker);
1349 return (walker);
1350 }
1351
1352 /* ARGSUSED */
1353 static msgq_wakeup_t *
1354 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1355 {
1356 msgq_wakeup_t *qptr;
1357 int count;
1358 int check_index;
1359 int neg_index;
1360 int nbuckets;
1361
1362 if (!qp->msg_ngt_cnt) {
1363 return (NULL);
1364 }
1365 neg_index = msg_type_hash(-type);
1366
1367 /*
1368 * Check for a match among the negative type queues. Any buckets
1369 * at neg_index or larger can match the type. Use the last send
1370 * time to randomize the starting bucket to prevent starvation.
1371 * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1372 * from the random starting point, and wrapping around after
1373 * MSG_MAX_QNUM.
1374 */
1375
1376 nbuckets = MSG_MAX_QNUM - neg_index + 1;
1377 check_index = neg_index + (qp->msg_stime % nbuckets);
1378
1379 for (count = nbuckets; count > 0; count--) {
1380 qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1381 while (qptr) {
1382 /*
1383 * The lowest hash bucket may actually contain
1384 * message types that are not valid for this
1385 * request. This can happen due to the fact that
1386 * the message buckets actually contain a consecutive
1387 * range of types.
1388 */
1389 if (-qptr->msgw_type >= type) {
1390 list_remove(&qp->msg_wait_snd_ngt[check_index],
1391 qptr);
1392 return (qptr);
1393 }
1394 qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1395 qptr);
1396 }
1397 if (++check_index > MSG_MAX_QNUM) {
1398 check_index = neg_index;
1399 }
1400 }
1401 return (NULL);
1402 }
1403
1404 static int
1405 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1406 kmsqid_t *qp)
1407 {
1408 int cvres;
1409
1410 cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1411
1412 list_insert_tail(queue, entry);
1413
1414 qp->msg_rcv_cnt++;
1415 cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1416 *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1417 qp->msg_rcv_cnt--;
1418
1419 if (list_link_active(&entry->msgw_list)) {
1420 /*
1421 * We woke up unexpectedly, remove ourself.
1422 */
1423 list_remove(queue, entry);
1424 }
1425
1426 return (cvres);
1427 }
1428
1429 static void
1430 msg_rcvq_wakeup_all(list_t *q_ptr)
1431 {
1432 msgq_wakeup_t *q_walk;
1433
1434 while (q_walk = list_head(q_ptr)) {
1435 list_remove(q_ptr, q_walk);
1436 cv_signal(&q_walk->msgw_wake_cv);
1437 }
1438 }
1439
1440 /*
1441 * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1442 * system calls.
1443 */
1444 static ssize_t
1445 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1446 uintptr_t a4, uintptr_t a5)
1447 {
1448 ssize_t error;
1449
1450 switch (opcode) {
1451 case MSGGET:
1452 error = msgget((key_t)a1, (int)a2);
1453 break;
1454 case MSGCTL:
1455 error = msgctl((int)a1, (int)a2, (void *)a3);
1456 break;
1457 case MSGRCV:
1458 error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1459 (size_t)a3, (long)a4, (int)a5);
1460 break;
1461 case MSGSND:
1462 error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1463 (size_t)a3, (int)a4);
1464 break;
1465 case MSGIDS:
1466 error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1467 break;
1468 case MSGSNAP:
1469 error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1470 break;
1471 default:
1472 error = set_errno(EINVAL);
1473 break;
1474 }
1475
1476 return (error);
1477 }
1478
1479 /*
1480 * Determine if a writer who is waiting can process its message. If so
1481 * wake it up.
1482 */
1483 static void
1484 msg_wakeup_senders(kmsqid_t *qp)
1485
1486 {
1487 struct msgq_wakeup *ptr, *optr;
1488 size_t avail, smallest;
1489 int msgs_out;
1490
1491 /*
1492 * Is there a writer waiting, and if so, can it be serviced? If
1493 * not return back to the caller.
1494 */
1495 if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1496 return;
1497
1498 avail = qp->msg_qbytes - qp->msg_cbytes;
1499 if (avail < qp->msg_snd_smallest)
1500 return;
1501
1502 ptr = list_head(&qp->msg_wait_rcv);
1503 if (ptr == NULL) {
1504 qp->msg_snd_smallest = MSG_SMALL_INIT;
1505 return;
1506 }
1507 optr = ptr;
1508
1509 /*
1510 * smallest: minimum message size of all queued writers
1511 *
1512 * avail: amount of space left on the msgq
1513 * if all the writers we have woken up are successful.
1514 *
1515 * msgs_out: is the number of messages on the message queue if
1516 * all the writers we have woken up are successful.
1517 */
1518
1519 smallest = MSG_SMALL_INIT;
1520 msgs_out = qp->msg_qnum;
1521 while (ptr) {
1522 ptr = list_next(&qp->msg_wait_rcv, ptr);
1523 if (optr->msgw_snd_size <= avail) {
1524 list_remove(&qp->msg_wait_rcv, optr);
1525 avail -= optr->msgw_snd_size;
1526 cv_signal(&optr->msgw_wake_cv);
1527 msgs_out++;
1528 if (msgs_out == qp->msg_qmax ||
1529 avail < qp->msg_snd_smallest)
1530 break;
1531 } else {
1532 if (smallest > optr->msgw_snd_size)
1533 smallest = optr->msgw_snd_size;
1534 }
1535 optr = ptr;
1536 }
1537
1538 /*
1539 * Reset the smallest message size if the entire list has been visited
1540 */
1541 if (ptr == NULL && smallest != MSG_SMALL_INIT)
1542 qp->msg_snd_smallest = smallest;
1543 }
1544
1545 #ifdef _SYSCALL32_IMPL
1546 /*
1547 * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1548 * system calls for 32-bit callers on LP64 kernel.
1549 */
1550 static ssize32_t
1551 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1552 uint32_t a4, uint32_t a5)
1553 {
1554 ssize_t error;
1555
1556 switch (opcode) {
1557 case MSGGET:
1558 error = msgget((key_t)a1, (int)a2);
1559 break;
1560 case MSGCTL:
1561 error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1562 break;
1563 case MSGRCV:
1564 error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1565 (size_t)a3, (long)(int32_t)a4, (int)a5);
1566 break;
1567 case MSGSND:
1568 error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1569 (size_t)(int32_t)a3, (int)a4);
1570 break;
1571 case MSGIDS:
1572 error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1573 (uint_t *)(uintptr_t)a3);
1574 break;
1575 case MSGSNAP:
1576 error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1577 (long)(int32_t)a4);
1578 break;
1579 default:
1580 error = set_errno(EINVAL);
1581 break;
1582 }
1583
1584 return (error);
1585 }
1586 #endif /* SYSCALL32_IMPL */