1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  27 /*        All Rights Reserved   */
  28 
  29 
  30 /*
  31  * Inter-Process Communication Message Facility.
  32  *
  33  * See os/ipc.c for a description of common IPC functionality.
  34  *
  35  * Resource controls
  36  * -----------------
  37  *
  38  * Control:      zone.max-msg-ids (rc_zone_msgmni)
  39  * Description:  Maximum number of message queue ids allowed a zone.
  40  *
  41  *   When msgget() is used to allocate a message queue, one id is
  42  *   allocated.  If the id allocation doesn't succeed, msgget() fails
  43  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
  44  *   the id is deallocated.
  45  *
  46  * Control:      project.max-msg-ids (rc_project_msgmni)
  47  * Description:  Maximum number of message queue ids allowed a project.
  48  *
  49  *   When msgget() is used to allocate a message queue, one id is
  50  *   allocated.  If the id allocation doesn't succeed, msgget() fails
  51  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
  52  *   the id is deallocated.
  53  *
  54  * Control:      process.max-msg-qbytes (rc_process_msgmnb)
  55  * Description:  Maximum number of bytes of messages on a message queue.
  56  *
  57  *   When msgget() successfully allocates a message queue, the minimum
  58  *   enforced value of this limit is used to initialize msg_qbytes.
  59  *
  60  * Control:      process.max-msg-messages (rc_process_msgtql)
  61  * Description:  Maximum number of messages on a message queue.
  62  *
  63  *   When msgget() successfully allocates a message queue, the minimum
  64  *   enforced value of this limit is used to initialize a per-queue
  65  *   limit on the number of messages.
  66  */
  67 
  68 #include <sys/types.h>
  69 #include <sys/t_lock.h>
  70 #include <sys/param.h>
  71 #include <sys/cred.h>
  72 #include <sys/user.h>
  73 #include <sys/proc.h>
  74 #include <sys/time.h>
  75 #include <sys/ipc.h>
  76 #include <sys/ipc_impl.h>
  77 #include <sys/msg.h>
  78 #include <sys/msg_impl.h>
  79 #include <sys/list.h>
  80 #include <sys/systm.h>
  81 #include <sys/sysmacros.h>
  82 #include <sys/cpuvar.h>
  83 #include <sys/kmem.h>
  84 #include <sys/ddi.h>
  85 #include <sys/errno.h>
  86 #include <sys/cmn_err.h>
  87 #include <sys/debug.h>
  88 #include <sys/project.h>
  89 #include <sys/modctl.h>
  90 #include <sys/syscall.h>
  91 #include <sys/policy.h>
  92 #include <sys/zone.h>
  93 
  94 #include <c2/audit.h>
  95 
  96 /*
  97  * The following tunables are obsolete.  Though for compatibility we
  98  * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
  99  * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
 100  * mechanism for administrating the IPC Message facility is through the
 101  * resource controls described at the top of this file.
 102  */
 103 size_t  msginfo_msgmax = 2048;  /* (obsolete) */
 104 size_t  msginfo_msgmnb = 4096;  /* (obsolete) */
 105 int     msginfo_msgmni = 50;    /* (obsolete) */
 106 int     msginfo_msgtql = 40;    /* (obsolete) */
 107 int     msginfo_msgssz = 8;     /* (obsolete) */
 108 int     msginfo_msgmap = 0;     /* (obsolete) */
 109 ushort_t msginfo_msgseg = 1024; /* (obsolete) */
 110 
 111 extern rctl_hndl_t rc_zone_msgmni;
 112 extern rctl_hndl_t rc_project_msgmni;
 113 extern rctl_hndl_t rc_process_msgmnb;
 114 extern rctl_hndl_t rc_process_msgtql;
 115 static ipc_service_t *msq_svc;
 116 static zone_key_t msg_zone_key;
 117 
 118 static void msg_dtor(kipc_perm_t *);
 119 static void msg_rmid(kipc_perm_t *);
 120 static void msg_remove_zone(zoneid_t, void *);
 121 
 122 /*
 123  * Module linkage information for the kernel.
 124  */
 125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
 126         uintptr_t a4, uintptr_t a5);
 127 
 128 static struct sysent ipcmsg_sysent = {
 129         6,
 130 #ifdef  _LP64
 131         SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
 132 #else
 133         SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
 134 #endif
 135         (int (*)())msgsys
 136 };
 137 
 138 #ifdef  _SYSCALL32_IMPL
 139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
 140         uint32_t a4, uint32_t a5);
 141 
 142 static struct sysent ipcmsg_sysent32 = {
 143         6,
 144         SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
 145         (int (*)())msgsys32
 146 };
 147 #endif  /* _SYSCALL32_IMPL */
 148 
 149 static struct modlsys modlsys = {
 150         &mod_syscallops, "System V message facility", &ipcmsg_sysent
 151 };
 152 
 153 #ifdef _SYSCALL32_IMPL
 154 static struct modlsys modlsys32 = {
 155         &mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
 156 };
 157 #endif
 158 
 159 /*
 160  *      Big Theory statement for message queue correctness
 161  *
 162  * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
 163  * receivers who are waiting for an event.  Using the cv_broadcast method
 164  * resulted in negative scaling when the number of waiting receivers are large
 165  * (the thundering herd problem).  Instead, the receivers waiting to receive a
 166  * message are now linked in a queue-like fashion and awaken one at a time in
 167  * a controlled manner.
 168  *
 169  * Receivers can block on two different classes of waiting list:
 170  *    1) "sendwait" list, which is the more complex list of the two.  The
 171  *        receiver will be awakened by a sender posting a new message.  There
 172  *        are two types of "sendwait" list used:
 173  *              a) msg_wait_snd: handles all receivers who are looking for
 174  *                 a message type >= 0, but was unable to locate a match.
 175  *
 176  *                 slot 0: reserved for receivers that have designated they
 177  *                         will take any message type.
 178  *                 rest:   consist of receivers requesting a specific type
 179  *                         but the type was not present.  The entries are
 180  *                         hashed into a bucket in an attempt to keep
 181  *                         any list search relatively short.
 182  *              b) msg_wait_snd_ngt: handles all receivers that have designated
 183  *                 a negative message type. Unlike msg_wait_snd, the hash bucket
 184  *                 serves a range of negative message types (-1 to -5, -6 to -10
 185  *                 and so forth), where the last bucket is reserved for all the
 186  *                 negative message types that hash outside of MSG_MAX_QNUM - 1.
 187  *                 This is done this way to simplify the operation of locating a
 188  *                 negative message type.
 189  *
 190  *    2) "copyout" list, where the receiver is awakened by another
 191  *       receiver after a message is copied out.  This is a linked list
 192  *       of waiters that are awakened one at a time.  Although the solution is
 193  *       not optimal, the complexity that would be added in for waking
 194  *       up the right entry far exceeds any potential pay back (too many
 195  *       correctness and corner case issues).
 196  *
 197  * The lists are doubly linked.  In the case of the "sendwait"
 198  * list, this allows the thread to remove itself from the list without having
 199  * to traverse the list.  In the case of the "copyout" list it simply allows
 200  * us to use common functions with the "sendwait" list.
 201  *
 202  * To make sure receivers are not hung out to dry, we must guarantee:
 203  *    1. If any queued message matches any receiver, then at least one
 204  *       matching receiver must be processing the request.
 205  *    2. Blocking on the copyout queue is only temporary while messages
 206  *       are being copied out.  The process is guaranted to wakeup
 207  *       when it gets to front of the queue (copyout is a FIFO).
 208  *
 209  * Rules for blocking and waking up:
 210  *   1. A receiver entering msgrcv must examine all messages for a match
 211  *      before blocking on a sendwait queue.
 212  *   2. If the receiver blocks because the message it chose is already
 213  *      being copied out, then when it wakes up needs to start start
 214  *      checking the messages from the beginning.
 215  *   3) When ever a process returns from msgrcv for any reason, if it
 216  *      had attempted to copy a message or blocked waiting for a copy
 217  *      to complete it needs to wakeup the next receiver blocked on
 218  *      a copy out.
 219  *   4) When a message is sent, the sender selects a process waiting
 220  *      for that type of message.  This selection process rotates between
 221  *      receivers types of 0, negative and positive to prevent starvation of
 222  *      any one particular receiver type.
 223  *   5) The following are the scenarios for processes that are awakened
 224  *      by a msgsnd:
 225  *              a) The process finds the message and is able to copy
 226  *                 it out.  Once complete, the process returns.
 227  *              b) The message that was sent that triggered the wakeup is no
 228  *                 longer available (another process found the message first).
 229  *                 We issue a wakeup on copy queue and then go back to
 230  *                 sleep waiting for another matching message to be sent.
 231  *              c) The message that was supposed to be processed was
 232  *                 already serviced by another process.  However a different
 233  *                 message is present which we can service.  The message
 234  *                 is copied and the process returns.
 235  *              d) The message is found, but some sort of error occurs that
 236  *                 prevents the message from being copied.  The receiver
 237  *                 wakes up the next sender that can service this message
 238  *                 type and returns an error to the caller.
 239  *              e) The message is found, but it is marked as being copied
 240  *                 out.  The receiver then goes to sleep on the copyout
 241  *                 queue where it will be awakened again sometime in the future.
 242  *
 243  *
 244  *   6) Whenever a message is found that matches the message type designated,
 245  *      but is being copied out we have to block on the copyout queue.
 246  *      After process copying finishes the copy out, it  must wakeup (either
 247  *      directly or indirectly) all receivers who blocked on its copyout,
 248  *      so they are guaranteed a chance to examine the remaining messages.
 249  *      This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
 250  *      and so on.  The chain cannot be broken.  This leads to the following
 251  *      cases:
 252  *              a) A receiver is finished copying the message (or encountered)
 253  *                 an error), the first entry on the copyout queue is woken
 254  *                 up.
 255  *              b) When the receiver is woken up, it attempts to locate
 256  *                 a message type match.
 257  *              c) If a message type is found and
 258  *                      -- MSG_RCVCOPY flag is not set, the message is
 259  *                         marked for copying out.  Regardless of the copyout
 260  *                         success the next entry on the copyout queue is
 261  *                         awakened and the operation is completed.
 262  *                      -- MSG_RCVCOPY is set, we simply go back to sleep again
 263  *                         on the copyout queue.
 264  *              d) If the message type is not found then we wakeup the next
 265  *                 process on the copyout queue.
 266  *   7) If a msgsnd is unable to complete for of any of the following reasons
 267  *        a) the msgq has no space for the message
 268  *        b) the maximum number of messages allowed has been reached
 269  *      then one of two things happen:
 270  *        1) If the passed in msg_flag has IPC_NOWAIT set, then
 271  *           an error is returned.
 272  *        2) The IPC_NOWAIT bit is not set in msg_flag, then the
 273  *           the thread is placed to sleep until the request can be
 274  *           serviced.
 275  *   8) When waking a thread waiting to send a message, a check is done to
 276  *      verify that the operation being asked for by the thread will complete.
 277  *      This decision making process is done in a loop where the oldest request
 278  *      is checked first. The search will continue until there is no more
 279  *      room on the msgq or we have checked all the waiters.
 280  */
 281 
 282 static uint_t msg_type_hash(long);
 283 static int msgq_check_err(kmsqid_t *qp, int cvres);
 284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
 285     kmsqid_t *);
 286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
 287     struct msg *, struct ipcmsgbuf *, int);
 288 static void msg_rcvq_wakeup_all(list_t *);
 289 static void msg_wakeup_senders(kmsqid_t *);
 290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
 291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
 292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
 293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
 294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
 295 static struct msg *msgrcv_lookup(kmsqid_t *, long);
 296 
 297 msg_select_t msg_fnd_sndr[] = {
 298         { msg_fnd_any_snd, &msg_fnd_sndr[1] },
 299         { msg_fnd_spc_snd, &msg_fnd_sndr[2] },
 300         { msg_fnd_neg_snd, &msg_fnd_sndr[0] }
 301 };
 302 
 303 msg_select_t msg_fnd_rdr[1] = {
 304         { msg_fnd_any_rdr, &msg_fnd_rdr[0] },
 305 };
 306 
 307 static struct modlinkage modlinkage = {
 308         MODREV_1,
 309         &modlsys,
 310 #ifdef _SYSCALL32_IMPL
 311         &modlsys32,
 312 #endif
 313         NULL
 314 };
 315 
 316 #define MSG_SMALL_INIT (size_t)-1
 317 int
 318 _init(void)
 319 {
 320         int result;
 321 
 322         msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
 323             sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
 324             offsetof(ipc_rqty_t, ipcq_msgmni));
 325         zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
 326 
 327         if ((result = mod_install(&modlinkage)) == 0)
 328                 return (0);
 329 
 330         (void) zone_key_delete(msg_zone_key);
 331         ipcs_destroy(msq_svc);
 332 
 333         return (result);
 334 }
 335 
 336 int
 337 _fini(void)
 338 {
 339         return (EBUSY);
 340 }
 341 
 342 int
 343 _info(struct modinfo *modinfop)
 344 {
 345         return (mod_info(&modlinkage, modinfop));
 346 }
 347 
 348 static void
 349 msg_dtor(kipc_perm_t *perm)
 350 {
 351         kmsqid_t *qp = (kmsqid_t *)perm;
 352         int             ii;
 353 
 354         for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
 355                 ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
 356                 ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
 357                 list_destroy(&qp->msg_wait_snd[ii]);
 358                 list_destroy(&qp->msg_wait_snd_ngt[ii]);
 359         }
 360         ASSERT(list_is_empty(&qp->msg_cpy_block));
 361         ASSERT(list_is_empty(&qp->msg_wait_rcv));
 362         list_destroy(&qp->msg_cpy_block);
 363         ASSERT(qp->msg_snd_cnt == 0);
 364         ASSERT(qp->msg_cbytes == 0);
 365         list_destroy(&qp->msg_list);
 366         list_destroy(&qp->msg_wait_rcv);
 367 }
 368 
 369 
 370 #define msg_hold(mp)    (mp)->msg_copycnt++
 371 
 372 /*
 373  * msg_rele - decrement the reference count on the message.  When count
 374  * reaches zero, free message header and contents.
 375  */
 376 static void
 377 msg_rele(struct msg *mp)
 378 {
 379         ASSERT(mp->msg_copycnt > 0);
 380         if (mp->msg_copycnt-- == 1) {
 381                 if (mp->msg_addr)
 382                         kmem_free(mp->msg_addr, mp->msg_size);
 383                 kmem_free(mp, sizeof (struct msg));
 384         }
 385 }
 386 
 387 /*
 388  * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
 389  * waiting for free bytes on queue.
 390  *
 391  * Called with queue locked.
 392  */
 393 static void
 394 msgunlink(kmsqid_t *qp, struct msg *mp)
 395 {
 396         list_remove(&qp->msg_list, mp);
 397         qp->msg_qnum--;
 398         qp->msg_cbytes -= mp->msg_size;
 399         msg_rele(mp);
 400 
 401         /* Wake up waiting writers */
 402         msg_wakeup_senders(qp);
 403 }
 404 
 405 static void
 406 msg_rmid(kipc_perm_t *perm)
 407 {
 408         kmsqid_t *qp = (kmsqid_t *)perm;
 409         struct msg *mp;
 410         int             ii;
 411 
 412 
 413         while ((mp = list_head(&qp->msg_list)) != NULL)
 414                 msgunlink(qp, mp);
 415         ASSERT(qp->msg_cbytes == 0);
 416 
 417         /*
 418          * Wake up everyone who is in a wait state of some sort
 419          * for this message queue.
 420          */
 421         for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
 422                 msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
 423                 msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
 424         }
 425         msg_rcvq_wakeup_all(&qp->msg_cpy_block);
 426         msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
 427 }
 428 
 429 /*
 430  * msgctl system call.
 431  *
 432  * gets q lock (via ipc_lookup), releases before return.
 433  * may call users of msg_lock
 434  */
 435 static int
 436 msgctl(int msgid, int cmd, void *arg)
 437 {
 438         STRUCT_DECL(msqid_ds, ds);              /* SVR4 queue work area */
 439         kmsqid_t                *qp;            /* ptr to associated q */
 440         int                     error;
 441         struct  cred            *cr;
 442         model_t mdl = get_udatamodel();
 443         struct msqid_ds64       ds64;
 444         kmutex_t                *lock;
 445         proc_t                  *pp = curproc;
 446 
 447         STRUCT_INIT(ds, mdl);
 448         cr = CRED();
 449 
 450         /*
 451          * Perform pre- or non-lookup actions (e.g. copyins, RMID).
 452          */
 453         switch (cmd) {
 454         case IPC_SET:
 455                 if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
 456                         return (set_errno(EFAULT));
 457                 break;
 458 
 459         case IPC_SET64:
 460                 if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
 461                         return (set_errno(EFAULT));
 462                 break;
 463 
 464         case IPC_RMID:
 465                 if (error = ipc_rmid(msq_svc, msgid, cr))
 466                         return (set_errno(error));
 467                 return (0);
 468         }
 469 
 470         /*
 471          * get msqid_ds for this msgid
 472          */
 473         if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
 474                 return (set_errno(EINVAL));
 475 
 476         switch (cmd) {
 477         case IPC_SET:
 478                 if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
 479                     secpolicy_ipc_config(cr) != 0) {
 480                         mutex_exit(lock);
 481                         return (set_errno(EPERM));
 482                 }
 483                 if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
 484                     &STRUCT_BUF(ds)->msg_perm, mdl)) {
 485                         mutex_exit(lock);
 486                         return (set_errno(error));
 487                 }
 488                 qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
 489                 qp->msg_ctime = gethrestime_sec();
 490                 break;
 491 
 492         case IPC_STAT:
 493                 if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
 494                         mutex_exit(lock);
 495                         return (set_errno(error));
 496                 }
 497 
 498                 if (qp->msg_rcv_cnt)
 499                         qp->msg_perm.ipc_mode |= MSG_RWAIT;
 500                 if (qp->msg_snd_cnt)
 501                         qp->msg_perm.ipc_mode |= MSG_WWAIT;
 502                 ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
 503                 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
 504                 STRUCT_FSETP(ds, msg_first, NULL);      /* kernel addr */
 505                 STRUCT_FSETP(ds, msg_last, NULL);
 506                 STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
 507                 STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
 508                 STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
 509                 STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
 510                 STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
 511                 STRUCT_FSET(ds, msg_stime, qp->msg_stime);
 512                 STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
 513                 STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
 514                 break;
 515 
 516         case IPC_SET64:
 517                 mutex_enter(&pp->p_lock);
 518                 if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
 519                     secpolicy_ipc_config(cr) != 0 &&
 520                     rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
 521                     ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
 522                         mutex_exit(&pp->p_lock);
 523                         mutex_exit(lock);
 524                         return (set_errno(EPERM));
 525                 }
 526                 mutex_exit(&pp->p_lock);
 527                 if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
 528                     &ds64.msgx_perm)) {
 529                         mutex_exit(lock);
 530                         return (set_errno(error));
 531                 }
 532                 qp->msg_qbytes = ds64.msgx_qbytes;
 533                 qp->msg_ctime = gethrestime_sec();
 534                 break;
 535 
 536         case IPC_STAT64:
 537                 if (qp->msg_rcv_cnt)
 538                         qp->msg_perm.ipc_mode |= MSG_RWAIT;
 539                 if (qp->msg_snd_cnt)
 540                         qp->msg_perm.ipc_mode |= MSG_WWAIT;
 541                 ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
 542                 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
 543                 ds64.msgx_cbytes = qp->msg_cbytes;
 544                 ds64.msgx_qnum = qp->msg_qnum;
 545                 ds64.msgx_qbytes = qp->msg_qbytes;
 546                 ds64.msgx_lspid = qp->msg_lspid;
 547                 ds64.msgx_lrpid = qp->msg_lrpid;
 548                 ds64.msgx_stime = qp->msg_stime;
 549                 ds64.msgx_rtime = qp->msg_rtime;
 550                 ds64.msgx_ctime = qp->msg_ctime;
 551                 break;
 552 
 553         default:
 554                 mutex_exit(lock);
 555                 return (set_errno(EINVAL));
 556         }
 557 
 558         mutex_exit(lock);
 559 
 560         /*
 561          * Do copyout last (after releasing mutex).
 562          */
 563         switch (cmd) {
 564         case IPC_STAT:
 565                 if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
 566                         return (set_errno(EFAULT));
 567                 break;
 568 
 569         case IPC_STAT64:
 570                 if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
 571                         return (set_errno(EFAULT));
 572                 break;
 573         }
 574 
 575         return (0);
 576 }
 577 
 578 /*
 579  * Remove all message queues associated with a given zone.  Called by
 580  * zone_shutdown when the zone is halted.
 581  */
 582 /*ARGSUSED1*/
 583 static void
 584 msg_remove_zone(zoneid_t zoneid, void *arg)
 585 {
 586         ipc_remove_zone(msq_svc, zoneid);
 587 }
 588 
 589 /*
 590  * msgget system call.
 591  */
 592 static int
 593 msgget(key_t key, int msgflg)
 594 {
 595         kmsqid_t        *qp;
 596         kmutex_t        *lock;
 597         int             id, error;
 598         int             ii;
 599         proc_t          *pp = curproc;
 600 
 601 top:
 602         if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
 603                 return (set_errno(error));
 604 
 605         if (IPC_FREE(&qp->msg_perm)) {
 606                 mutex_exit(lock);
 607                 mutex_exit(&pp->p_lock);
 608 
 609                 list_create(&qp->msg_list, sizeof (struct msg),
 610                     offsetof(struct msg, msg_node));
 611                 qp->msg_qnum = 0;
 612                 qp->msg_lspid = qp->msg_lrpid = 0;
 613                 qp->msg_stime = qp->msg_rtime = 0;
 614                 qp->msg_ctime = gethrestime_sec();
 615                 qp->msg_ngt_cnt = 0;
 616                 qp->msg_neg_copy = 0;
 617                 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
 618                         list_create(&qp->msg_wait_snd[ii],
 619                             sizeof (msgq_wakeup_t),
 620                             offsetof(msgq_wakeup_t, msgw_list));
 621                         list_create(&qp->msg_wait_snd_ngt[ii],
 622                             sizeof (msgq_wakeup_t),
 623                             offsetof(msgq_wakeup_t, msgw_list));
 624                 }
 625                 /*
 626                  * The proper initialization of msg_lowest_type is to the
 627                  * highest possible value.  By doing this we guarantee that
 628                  * when the first send happens, the lowest type will be set
 629                  * properly.
 630                  */
 631                 qp->msg_lowest_type = MSG_SMALL_INIT;
 632                 list_create(&qp->msg_cpy_block,
 633                     sizeof (msgq_wakeup_t),
 634                     offsetof(msgq_wakeup_t, msgw_list));
 635                 list_create(&qp->msg_wait_rcv,
 636                     sizeof (msgq_wakeup_t),
 637                     offsetof(msgq_wakeup_t, msgw_list));
 638                 qp->msg_fnd_sndr = &msg_fnd_sndr[0];
 639                 qp->msg_fnd_rdr = &msg_fnd_rdr[0];
 640                 qp->msg_rcv_cnt = 0;
 641                 qp->msg_snd_cnt = 0;
 642                 qp->msg_snd_smallest = MSG_SMALL_INIT;
 643 
 644                 if (error = ipc_commit_begin(msq_svc, key, msgflg,
 645                     (kipc_perm_t *)qp)) {
 646                         if (error == EAGAIN)
 647                                 goto top;
 648                         return (set_errno(error));
 649                 }
 650                 qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
 651                     pp->p_rctls, pp);
 652                 qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
 653                     pp->p_rctls, pp);
 654                 lock = ipc_commit_end(msq_svc, &qp->msg_perm);
 655         }
 656 
 657         if (AU_AUDITING())
 658                 audit_ipcget(AT_IPC_MSG, (void *)qp);
 659 
 660         id = qp->msg_perm.ipc_id;
 661         mutex_exit(lock);
 662         return (id);
 663 }
 664 
 665 static ssize_t
 666 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
 667 {
 668         struct msg      *smp;   /* ptr to best msg on q */
 669         kmsqid_t        *qp;    /* ptr to associated q */
 670         kmutex_t        *lock;
 671         size_t          xtsz;   /* transfer byte count */
 672         int             error = 0;
 673         int             cvres;
 674         uint_t          msg_hash;
 675         msgq_wakeup_t   msg_entry;
 676 
 677         CPU_STATS_ADDQ(CPU, sys, msg, 1);       /* bump msg send/rcv count */
 678 
 679         msg_hash = msg_type_hash(msgtyp);
 680         if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
 681                 return ((ssize_t)set_errno(EINVAL));
 682         }
 683         ipc_hold(msq_svc, (kipc_perm_t *)qp);
 684 
 685         if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
 686                 goto msgrcv_out;
 687         }
 688 
 689         /*
 690          * Various information (including the condvar_t) required for the
 691          * process to sleep is provided by it's stack.
 692          */
 693         msg_entry.msgw_thrd = curthread;
 694         msg_entry.msgw_snd_wake = 0;
 695         msg_entry.msgw_type = msgtyp;
 696 findmsg:
 697         smp = msgrcv_lookup(qp, msgtyp);
 698 
 699         if (smp) {
 700                 /*
 701                  * We found a possible message to copy out.
 702                  */
 703                 if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
 704                         long t = msg_entry.msgw_snd_wake;
 705                         long copy_type = smp->msg_type;
 706 
 707                         /*
 708                          * It is available, attempt to copy it.
 709                          */
 710                         error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
 711                             smp, msgp, msgflg);
 712 
 713                         /*
 714                          * It is possible to consume a different message
 715                          * type then what originally awakened for (negative
 716                          * types).  If this happens a check must be done to
 717                          * to determine if another receiver is available
 718                          * for the waking message type,  Failure to do this
 719                          * can result in a message on the queue that can be
 720                          * serviced by a sleeping receiver.
 721                          */
 722                         if (!error && t && (copy_type != t))
 723                                 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
 724 
 725                         /*
 726                          * Don't forget to wakeup a sleeper that blocked because
 727                          * we were copying things out.
 728                          */
 729                         msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
 730                         goto msgrcv_out;
 731                 }
 732                 /*
 733                  * The selected message is being copied out, so block.  We do
 734                  * not need to wake the next person up on the msg_cpy_block list
 735                  * due to the fact some one is copying out and they will get
 736                  * things moving again once the copy is completed.
 737                  */
 738                 cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
 739                     &msg_entry, &lock, qp);
 740                 error = msgq_check_err(qp, cvres);
 741                 if (error) {
 742                         goto msgrcv_out;
 743                 }
 744                 goto findmsg;
 745         }
 746         /*
 747          * There isn't a message to copy out that matches the designated
 748          * criteria.
 749          */
 750         if (msgflg & IPC_NOWAIT) {
 751                 error = ENOMSG;
 752                 goto msgrcv_out;
 753         }
 754         msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
 755 
 756         /*
 757          * Wait for new message.  We keep the negative and positive types
 758          * separate for performance reasons.
 759          */
 760         msg_entry.msgw_snd_wake = 0;
 761         if (msgtyp >= 0) {
 762                 cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
 763                     &msg_entry, &lock, qp);
 764         } else {
 765                 qp->msg_ngt_cnt++;
 766                 cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
 767                     &msg_entry, &lock, qp);
 768                 qp->msg_ngt_cnt--;
 769         }
 770 
 771         if (!(error = msgq_check_err(qp, cvres))) {
 772                 goto findmsg;
 773         }
 774 
 775 msgrcv_out:
 776         if (error) {
 777                 msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
 778                 if (msg_entry.msgw_snd_wake) {
 779                         msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
 780                             msg_entry.msgw_snd_wake);
 781                 }
 782                 ipc_rele(msq_svc, (kipc_perm_t *)qp);
 783                 return ((ssize_t)set_errno(error));
 784         }
 785         ipc_rele(msq_svc, (kipc_perm_t *)qp);
 786         return ((ssize_t)xtsz);
 787 }
 788 
 789 static int
 790 msgq_check_err(kmsqid_t *qp, int cvres)
 791 {
 792         if (IPC_FREE(&qp->msg_perm)) {
 793                 return (EIDRM);
 794         }
 795 
 796         if (cvres == 0) {
 797                 return (EINTR);
 798         }
 799 
 800         return (0);
 801 }
 802 
 803 static int
 804 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
 805     size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
 806 {
 807         size_t          xtsz;
 808         STRUCT_HANDLE(ipcmsgbuf, umsgp);
 809         model_t         mdl = get_udatamodel();
 810         int             copyerror = 0;
 811 
 812         STRUCT_SET_HANDLE(umsgp, mdl, msgp);
 813         if (msgsz < smp->msg_size) {
 814                 if ((msgflg & MSG_NOERROR) == 0) {
 815                         return (E2BIG);
 816                 } else {
 817                         xtsz = msgsz;
 818                 }
 819         } else {
 820                 xtsz = smp->msg_size;
 821         }
 822         *xtsz_ret = xtsz;
 823 
 824         /*
 825          * To prevent a DOS attack we mark the message as being
 826          * copied out and release mutex.  When the copy is completed
 827          * we need to acquire the mutex and make the appropriate updates.
 828          */
 829         ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
 830         smp->msg_flags |= MSG_RCVCOPY;
 831         msg_hold(smp);
 832         if (msgtyp < 0) {
 833                 ASSERT(qp->msg_neg_copy == 0);
 834                 qp->msg_neg_copy = 1;
 835         }
 836         mutex_exit(*lock);
 837 
 838         if (mdl == DATAMODEL_NATIVE) {
 839                 copyerror = copyout(&smp->msg_type, msgp,
 840                     sizeof (smp->msg_type));
 841         } else {
 842                 /*
 843                  * 32-bit callers need an imploded msg type.
 844                  */
 845                 int32_t msg_type32 = smp->msg_type;
 846 
 847                 copyerror = copyout(&msg_type32, msgp,
 848                     sizeof (msg_type32));
 849         }
 850 
 851         if (copyerror == 0 && xtsz) {
 852                 copyerror = copyout(smp->msg_addr,
 853                     STRUCT_FADDR(umsgp, mtext), xtsz);
 854         }
 855 
 856         /*
 857          * Reclaim the mutex and make sure the message queue still exists.
 858          */
 859 
 860         *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
 861         if (msgtyp < 0) {
 862                 qp->msg_neg_copy = 0;
 863         }
 864         ASSERT(smp->msg_flags & MSG_RCVCOPY);
 865         smp->msg_flags &= ~MSG_RCVCOPY;
 866         msg_rele(smp);
 867         if (IPC_FREE(&qp->msg_perm)) {
 868                 return (EIDRM);
 869         }
 870         if (copyerror) {
 871                 return (EFAULT);
 872         }
 873         qp->msg_lrpid = ttoproc(curthread)->p_pid;
 874         qp->msg_rtime = gethrestime_sec();
 875         msgunlink(qp, smp);
 876         return (0);
 877 }
 878 
 879 static struct msg *
 880 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
 881 {
 882         struct msg              *smp = NULL;
 883         long                    qp_low;
 884         struct msg              *mp;    /* ptr to msg on q */
 885         long                    low_msgtype;
 886         static struct msg       neg_copy_smp;
 887 
 888         mp = list_head(&qp->msg_list);
 889         if (msgtyp == 0) {
 890                 smp = mp;
 891         } else {
 892                 qp_low = qp->msg_lowest_type;
 893                 if (msgtyp > 0) {
 894                         /*
 895                          * If our lowest possible message type is larger than
 896                          * the message type desired, then we know there is
 897                          * no entry present.
 898                          */
 899                         if (qp_low > msgtyp) {
 900                                 return (NULL);
 901                         }
 902 
 903                         for (; mp; mp = list_next(&qp->msg_list, mp)) {
 904                                 if (msgtyp == mp->msg_type) {
 905                                         smp = mp;
 906                                         break;
 907                                 }
 908                         }
 909                 } else {
 910                         /*
 911                          * We have kept track of the lowest possible message
 912                          * type on the send queue.  This allows us to terminate
 913                          * the search early if we find a message type of that
 914                          * type.  Note, the lowest type may not be the actual
 915                          * lowest value in the system, it is only guaranteed
 916                          * that there isn't a value lower than that.
 917                          */
 918                         low_msgtype = -msgtyp;
 919                         if (low_msgtype < qp_low) {
 920                                 return (NULL);
 921                         }
 922                         if (qp->msg_neg_copy) {
 923                                 neg_copy_smp.msg_flags = MSG_RCVCOPY;
 924                                 return (&neg_copy_smp);
 925                         }
 926                         for (; mp; mp = list_next(&qp->msg_list, mp)) {
 927                                 if (mp->msg_type <= low_msgtype &&
 928                                     !(smp && smp->msg_type <= mp->msg_type)) {
 929                                         smp = mp;
 930                                         low_msgtype = mp->msg_type;
 931                                         if (low_msgtype == qp_low) {
 932                                                 break;
 933                                         }
 934                                 }
 935                         }
 936                         if (smp) {
 937                                 /*
 938                                  * Update the lowest message type.
 939                                  */
 940                                 qp->msg_lowest_type = smp->msg_type;
 941                         }
 942                 }
 943         }
 944         return (smp);
 945 }
 946 
 947 /*
 948  * msgids system call.
 949  */
 950 static int
 951 msgids(int *buf, uint_t nids, uint_t *pnids)
 952 {
 953         int error;
 954 
 955         if (error = ipc_ids(msq_svc, buf, nids, pnids))
 956                 return (set_errno(error));
 957 
 958         return (0);
 959 }
 960 
 961 #define RND(x)          roundup((x), sizeof (size_t))
 962 #define RND32(x)        roundup((x), sizeof (size32_t))
 963 
 964 /*
 965  * msgsnap system call.
 966  */
 967 static int
 968 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
 969 {
 970         struct msg      *mp;    /* ptr to msg on q */
 971         kmsqid_t        *qp;    /* ptr to associated q */
 972         kmutex_t        *lock;
 973         size_t          size;
 974         size_t          nmsg;
 975         struct msg      **snaplist;
 976         int             error, i;
 977         model_t         mdl = get_udatamodel();
 978         STRUCT_DECL(msgsnap_head, head);
 979         STRUCT_DECL(msgsnap_mhead, mhead);
 980 
 981         STRUCT_INIT(head, mdl);
 982         STRUCT_INIT(mhead, mdl);
 983 
 984         if (bufsz < STRUCT_SIZE(head))
 985                 return (set_errno(EINVAL));
 986 
 987         if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
 988                 return (set_errno(EINVAL));
 989 
 990         if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
 991                 mutex_exit(lock);
 992                 return (set_errno(error));
 993         }
 994         ipc_hold(msq_svc, (kipc_perm_t *)qp);
 995 
 996         /*
 997          * First compute the required buffer size and
 998          * the number of messages on the queue.
 999          */
1000         size = nmsg = 0;
1001         for (mp = list_head(&qp->msg_list); mp;
1002             mp = list_next(&qp->msg_list, mp)) {
1003                 if (msgtyp == 0 ||
1004                     (msgtyp > 0 && msgtyp == mp->msg_type) ||
1005                     (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1006                         nmsg++;
1007                         if (mdl == DATAMODEL_NATIVE)
1008                                 size += RND(mp->msg_size);
1009                         else
1010                                 size += RND32(mp->msg_size);
1011                 }
1012         }
1013 
1014         size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1015         if (size > bufsz)
1016                 nmsg = 0;
1017 
1018         if (nmsg > 0) {
1019                 /*
1020                  * Mark the messages as being copied.
1021                  */
1022                 snaplist = (struct msg **)kmem_alloc(nmsg *
1023                     sizeof (struct msg *), KM_SLEEP);
1024                 i = 0;
1025                 for (mp = list_head(&qp->msg_list); mp;
1026                     mp = list_next(&qp->msg_list, mp)) {
1027                         if (msgtyp == 0 ||
1028                             (msgtyp > 0 && msgtyp == mp->msg_type) ||
1029                             (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1030                                 msg_hold(mp);
1031                                 snaplist[i] = mp;
1032                                 i++;
1033                         }
1034                 }
1035         }
1036         mutex_exit(lock);
1037 
1038         /*
1039          * Copy out the buffer header.
1040          */
1041         STRUCT_FSET(head, msgsnap_size, size);
1042         STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1043         if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1044                 error = EFAULT;
1045 
1046         buf += STRUCT_SIZE(head);
1047 
1048         /*
1049          * Now copy out the messages one by one.
1050          */
1051         for (i = 0; i < nmsg; i++) {
1052                 mp = snaplist[i];
1053                 if (error == 0) {
1054                         STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1055                         STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1056                         if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1057                                 error = EFAULT;
1058                         buf += STRUCT_SIZE(mhead);
1059 
1060                         if (error == 0 &&
1061                             mp->msg_size != 0 &&
1062                             copyout(mp->msg_addr, buf, mp->msg_size))
1063                                 error = EFAULT;
1064                         if (mdl == DATAMODEL_NATIVE)
1065                                 buf += RND(mp->msg_size);
1066                         else
1067                                 buf += RND32(mp->msg_size);
1068                 }
1069                 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1070                 msg_rele(mp);
1071                 /* Check for msg q deleted or reallocated */
1072                 if (IPC_FREE(&qp->msg_perm))
1073                         error = EIDRM;
1074                 mutex_exit(lock);
1075         }
1076 
1077         (void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1078         ipc_rele(msq_svc, (kipc_perm_t *)qp);
1079 
1080         if (nmsg > 0)
1081                 kmem_free(snaplist, nmsg * sizeof (struct msg *));
1082 
1083         if (error)
1084                 return (set_errno(error));
1085         return (0);
1086 }
1087 
1088 #define MSG_PREALLOC_LIMIT 8192
1089 
1090 /*
1091  * msgsnd system call.
1092  */
1093 static int
1094 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1095 {
1096         kmsqid_t        *qp;
1097         kmutex_t        *lock = NULL;
1098         struct msg      *mp = NULL;
1099         long            type;
1100         int             error = 0, wait_wakeup = 0;
1101         msgq_wakeup_t   msg_entry;
1102         model_t         mdl = get_udatamodel();
1103         STRUCT_HANDLE(ipcmsgbuf, umsgp);
1104 
1105         CPU_STATS_ADDQ(CPU, sys, msg, 1);       /* bump msg send/rcv count */
1106         STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1107 
1108         if (mdl == DATAMODEL_NATIVE) {
1109                 if (copyin(msgp, &type, sizeof (type)))
1110                         return (set_errno(EFAULT));
1111         } else {
1112                 int32_t type32;
1113                 if (copyin(msgp, &type32, sizeof (type32)))
1114                         return (set_errno(EFAULT));
1115                 type = type32;
1116         }
1117 
1118         if (type < 1)
1119                 return (set_errno(EINVAL));
1120 
1121         /*
1122          * We want the value here large enough that most of the
1123          * the message operations will use the "lockless" path,
1124          * but small enough that a user can not reserve large
1125          * chunks of kernel memory unless they have a valid
1126          * reason to.
1127          */
1128         if (msgsz <= MSG_PREALLOC_LIMIT) {
1129                 /*
1130                  * We are small enough that we can afford to do the
1131                  * allocation now.  This saves dropping the lock
1132                  * and then reacquiring the lock.
1133                  */
1134                 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1135                 mp->msg_copycnt = 1;
1136                 mp->msg_size = msgsz;
1137                 if (msgsz) {
1138                         mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1139                         if (copyin(STRUCT_FADDR(umsgp, mtext),
1140                             mp->msg_addr, msgsz) == -1) {
1141                                 error = EFAULT;
1142                                 goto msgsnd_out;
1143                         }
1144                 }
1145         }
1146 
1147         if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1148                 error = EINVAL;
1149                 goto msgsnd_out;
1150         }
1151 
1152         ipc_hold(msq_svc, (kipc_perm_t *)qp);
1153 
1154         if (msgsz > qp->msg_qbytes) {
1155                 error = EINVAL;
1156                 goto msgsnd_out;
1157         }
1158 
1159         if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1160                 goto msgsnd_out;
1161 
1162 top:
1163         /*
1164          * Allocate space on q, message header, & buffer space.
1165          */
1166         ASSERT(qp->msg_qnum <= qp->msg_qmax);
1167         while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1168             (qp->msg_qnum == qp->msg_qmax)) {
1169                 int cvres;
1170 
1171                 if (msgflg & IPC_NOWAIT) {
1172                         error = EAGAIN;
1173                         goto msgsnd_out;
1174                 }
1175 
1176                 wait_wakeup = 0;
1177                 qp->msg_snd_cnt++;
1178                 msg_entry.msgw_snd_size = msgsz;
1179                 msg_entry.msgw_thrd = curthread;
1180                 msg_entry.msgw_type = type;
1181                 cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1182                 list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1183                 if (qp->msg_snd_smallest > msgsz)
1184                         qp->msg_snd_smallest = msgsz;
1185                 cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1186                 lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1187                 qp->msg_snd_cnt--;
1188                 if (list_link_active(&msg_entry.msgw_list))
1189                         list_remove(&qp->msg_wait_rcv, &msg_entry);
1190                 if (error = msgq_check_err(qp, cvres)) {
1191                         goto msgsnd_out;
1192                 }
1193                 wait_wakeup = 1;
1194         }
1195 
1196         if (mp == NULL) {
1197                 int failure;
1198 
1199                 mutex_exit(lock);
1200                 ASSERT(msgsz > 0);
1201                 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1202                 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1203                 mp->msg_size = msgsz;
1204                 mp->msg_copycnt = 1;
1205 
1206                 failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1207                     mp->msg_addr, msgsz) == -1);
1208                 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1209                 if (IPC_FREE(&qp->msg_perm)) {
1210                         error = EIDRM;
1211                         goto msgsnd_out;
1212                 }
1213                 if (failure) {
1214                         error = EFAULT;
1215                         goto msgsnd_out;
1216                 }
1217                 goto top;
1218         }
1219 
1220         /*
1221          * Everything is available, put msg on q.
1222          */
1223         qp->msg_qnum++;
1224         qp->msg_cbytes += msgsz;
1225         qp->msg_lspid = curproc->p_pid;
1226         qp->msg_stime = gethrestime_sec();
1227         mp->msg_type = type;
1228         if (qp->msg_lowest_type > type)
1229                 qp->msg_lowest_type = type;
1230         list_insert_tail(&qp->msg_list, mp);
1231         /*
1232          * Get the proper receiver going.
1233          */
1234         msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1235 
1236 msgsnd_out:
1237         /*
1238          * We were woken up from the send wait list, but an
1239          * an error occured on placing the message onto the
1240          * msg queue.  Given that, we need to do the wakeup
1241          * dance again.
1242          */
1243 
1244         if (wait_wakeup && error) {
1245                 msg_wakeup_senders(qp);
1246         }
1247         if (lock)
1248                 ipc_rele(msq_svc, (kipc_perm_t *)qp);   /* drops lock */
1249 
1250         if (error) {
1251                 if (mp)
1252                         msg_rele(mp);
1253                 return (set_errno(error));
1254         }
1255 
1256         return (0);
1257 }
1258 
1259 static void
1260 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1261 {
1262         msg_select_t    *walker = *flist;
1263         msgq_wakeup_t   *wakeup;
1264         uint_t          msg_hash;
1265 
1266         msg_hash = msg_type_hash(type);
1267 
1268         do {
1269                 wakeup = walker->selection(qp, msg_hash, type);
1270                 walker = walker->next_selection;
1271         } while (!wakeup && walker != *flist);
1272 
1273         *flist = (*flist)->next_selection;
1274         if (wakeup) {
1275                 if (type) {
1276                         wakeup->msgw_snd_wake = type;
1277                 }
1278                 cv_signal(&wakeup->msgw_wake_cv);
1279         }
1280 }
1281 
1282 static uint_t
1283 msg_type_hash(long msg_type)
1284 {
1285         if (msg_type < 0) {
1286                 long    hash = -msg_type / MSG_NEG_INTERVAL;
1287                 /*
1288                  * Negative message types are hashed over an
1289                  * interval.  Any message type that hashes
1290                  * beyond MSG_MAX_QNUM is automatically placed
1291                  * in the last bucket.
1292                  */
1293                 if (hash > MSG_MAX_QNUM)
1294                         hash = MSG_MAX_QNUM;
1295                 return (hash);
1296         }
1297 
1298         /*
1299          * 0 or positive message type.  The first bucket is reserved for
1300          * message receivers of type 0, the other buckets we hash into.
1301          */
1302         if (msg_type)
1303                 return (1 + (msg_type % MSG_MAX_QNUM));
1304         return (0);
1305 }
1306 
1307 /*
1308  * Routines to see if we have a receiver of type 0 either blocked waiting
1309  * for a message.  Simply return the first guy on the list.
1310  */
1311 
1312 static msgq_wakeup_t *
1313 /* ARGSUSED */
1314 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1315 {
1316         msgq_wakeup_t   *walker;
1317 
1318         walker = list_head(&qp->msg_wait_snd[0]);
1319 
1320         if (walker)
1321                 list_remove(&qp->msg_wait_snd[0], walker);
1322         return (walker);
1323 }
1324 
1325 static msgq_wakeup_t *
1326 /* ARGSUSED */
1327 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1328 {
1329         msgq_wakeup_t   *walker;
1330 
1331         walker = list_head(&qp->msg_cpy_block);
1332         if (walker)
1333                 list_remove(&qp->msg_cpy_block, walker);
1334         return (walker);
1335 }
1336 
1337 static msgq_wakeup_t *
1338 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1339 {
1340         msgq_wakeup_t   *walker;
1341 
1342         walker = list_head(&qp->msg_wait_snd[msg_hash]);
1343 
1344         while (walker && walker->msgw_type != type)
1345                 walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1346         if (walker)
1347                 list_remove(&qp->msg_wait_snd[msg_hash], walker);
1348         return (walker);
1349 }
1350 
1351 /* ARGSUSED */
1352 static msgq_wakeup_t *
1353 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1354 {
1355         msgq_wakeup_t   *qptr;
1356         int             count;
1357         int             check_index;
1358         int             neg_index;
1359         int             nbuckets;
1360 
1361         if (!qp->msg_ngt_cnt) {
1362                 return (NULL);
1363         }
1364         neg_index = msg_type_hash(-type);
1365 
1366         /*
1367          * Check for a match among the negative type queues.  Any buckets
1368          * at neg_index or larger can match the type.  Use the last send
1369          * time to randomize the starting bucket to prevent starvation.
1370          * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1371          * from the random starting point, and wrapping around after
1372          * MSG_MAX_QNUM.
1373          */
1374 
1375         nbuckets = MSG_MAX_QNUM - neg_index + 1;
1376         check_index = neg_index + (qp->msg_stime % nbuckets);
1377 
1378         for (count = nbuckets; count > 0; count--) {
1379                 qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1380                 while (qptr) {
1381                         /*
1382                          * The lowest hash bucket may actually contain
1383                          * message types that are not valid for this
1384                          * request.  This can happen due to the fact that
1385                          * the message buckets actually contain a consecutive
1386                          * range of types.
1387                          */
1388                         if (-qptr->msgw_type >= type) {
1389                                 list_remove(&qp->msg_wait_snd_ngt[check_index],
1390                                     qptr);
1391                                 return (qptr);
1392                         }
1393                         qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1394                             qptr);
1395                 }
1396                 if (++check_index > MSG_MAX_QNUM) {
1397                         check_index = neg_index;
1398                 }
1399         }
1400         return (NULL);
1401 }
1402 
1403 static int
1404 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1405     kmsqid_t *qp)
1406 {
1407         int             cvres;
1408 
1409         cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1410 
1411         list_insert_tail(queue, entry);
1412 
1413         qp->msg_rcv_cnt++;
1414         cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1415         *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1416         qp->msg_rcv_cnt--;
1417 
1418         if (list_link_active(&entry->msgw_list)) {
1419                 /*
1420                  * We woke up unexpectedly, remove ourself.
1421                  */
1422                 list_remove(queue, entry);
1423         }
1424 
1425         return (cvres);
1426 }
1427 
1428 static void
1429 msg_rcvq_wakeup_all(list_t *q_ptr)
1430 {
1431         msgq_wakeup_t   *q_walk;
1432 
1433         while (q_walk = list_head(q_ptr)) {
1434                 list_remove(q_ptr, q_walk);
1435                 cv_signal(&q_walk->msgw_wake_cv);
1436         }
1437 }
1438 
1439 /*
1440  * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1441  * system calls.
1442  */
1443 static ssize_t
1444 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1445         uintptr_t a4, uintptr_t a5)
1446 {
1447         ssize_t error;
1448 
1449         switch (opcode) {
1450         case MSGGET:
1451                 error = msgget((key_t)a1, (int)a2);
1452                 break;
1453         case MSGCTL:
1454                 error = msgctl((int)a1, (int)a2, (void *)a3);
1455                 break;
1456         case MSGRCV:
1457                 error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1458                     (size_t)a3, (long)a4, (int)a5);
1459                 break;
1460         case MSGSND:
1461                 error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1462                     (size_t)a3, (int)a4);
1463                 break;
1464         case MSGIDS:
1465                 error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1466                 break;
1467         case MSGSNAP:
1468                 error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1469                 break;
1470         default:
1471                 error = set_errno(EINVAL);
1472                 break;
1473         }
1474 
1475         return (error);
1476 }
1477 
1478 /*
1479  * Determine if a writer who is waiting can process its message.  If so
1480  * wake it up.
1481  */
1482 static void
1483 msg_wakeup_senders(kmsqid_t *qp)
1484 
1485 {
1486         struct msgq_wakeup *ptr, *optr;
1487         size_t avail, smallest;
1488         int msgs_out;
1489 
1490         /*
1491          * Is there a writer waiting, and if so, can it be serviced? If
1492          * not return back to the caller.
1493          */
1494         if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1495                 return;
1496 
1497         avail = qp->msg_qbytes - qp->msg_cbytes;
1498         if (avail < qp->msg_snd_smallest)
1499                 return;
1500 
1501         ptr = list_head(&qp->msg_wait_rcv);
1502         if (ptr == NULL) {
1503                 qp->msg_snd_smallest = MSG_SMALL_INIT;
1504                 return;
1505         }
1506         optr = ptr;
1507 
1508         /*
1509          * smallest:    minimum message size of all queued writers
1510          *
1511          * avail:       amount of space left on the msgq
1512          *              if all the writers we have woken up are successful.
1513          *
1514          * msgs_out:    is the number of messages on the message queue if
1515          *              all the writers we have woken up are successful.
1516          */
1517 
1518         smallest = MSG_SMALL_INIT;
1519         msgs_out = qp->msg_qnum;
1520         while (ptr) {
1521                 ptr = list_next(&qp->msg_wait_rcv, ptr);
1522                 if (optr->msgw_snd_size <= avail) {
1523                         list_remove(&qp->msg_wait_rcv, optr);
1524                         avail -= optr->msgw_snd_size;
1525                         cv_signal(&optr->msgw_wake_cv);
1526                         msgs_out++;
1527                         if (msgs_out == qp->msg_qmax ||
1528                             avail < qp->msg_snd_smallest)
1529                                 break;
1530                 } else {
1531                         if (smallest > optr->msgw_snd_size)
1532                                 smallest = optr->msgw_snd_size;
1533                 }
1534                 optr = ptr;
1535         }
1536 
1537         /*
1538          * Reset the smallest message size if the entire list has been visited
1539          */
1540         if (ptr == NULL && smallest != MSG_SMALL_INIT)
1541                 qp->msg_snd_smallest = smallest;
1542 }
1543 
1544 #ifdef  _SYSCALL32_IMPL
1545 /*
1546  * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1547  * system calls for 32-bit callers on LP64 kernel.
1548  */
1549 static ssize32_t
1550 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1551         uint32_t a4, uint32_t a5)
1552 {
1553         ssize_t error;
1554 
1555         switch (opcode) {
1556         case MSGGET:
1557                 error = msgget((key_t)a1, (int)a2);
1558                 break;
1559         case MSGCTL:
1560                 error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1561                 break;
1562         case MSGRCV:
1563                 error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1564                     (size_t)a3, (long)(int32_t)a4, (int)a5);
1565                 break;
1566         case MSGSND:
1567                 error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1568                     (size_t)(int32_t)a3, (int)a4);
1569                 break;
1570         case MSGIDS:
1571                 error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1572                     (uint_t *)(uintptr_t)a3);
1573                 break;
1574         case MSGSNAP:
1575                 error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1576                     (long)(int32_t)a4);
1577                 break;
1578         default:
1579                 error = set_errno(EINVAL);
1580                 break;
1581         }
1582 
1583         return (error);
1584 }
1585 #endif  /* SYSCALL32_IMPL */