1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2010 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 /*      Copyright (c) 1984, 1986, 1987, 1988, 1989 AT&T     */
  27 /*        All Rights Reserved   */
  28 
  29 
  30 /*
  31  * Inter-Process Communication Message Facility.
  32  *
  33  * See os/ipc.c for a description of common IPC functionality.
  34  *
  35  * Resource controls
  36  * -----------------
  37  *
  38  * Control:      zone.max-msg-ids (rc_zone_msgmni)
  39  * Description:  Maximum number of message queue ids allowed a zone.
  40  *
  41  *   When msgget() is used to allocate a message queue, one id is
  42  *   allocated.  If the id allocation doesn't succeed, msgget() fails
  43  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
  44  *   the id is deallocated.
  45  *
  46  * Control:      project.max-msg-ids (rc_project_msgmni)
  47  * Description:  Maximum number of message queue ids allowed a project.
  48  *
  49  *   When msgget() is used to allocate a message queue, one id is
  50  *   allocated.  If the id allocation doesn't succeed, msgget() fails
  51  *   and errno is set to ENOSPC.  Upon successful msgctl(, IPC_RMID)
  52  *   the id is deallocated.
  53  *
  54  * Control:      process.max-msg-qbytes (rc_process_msgmnb)
  55  * Description:  Maximum number of bytes of messages on a message queue.
  56  *
  57  *   When msgget() successfully allocates a message queue, the minimum
  58  *   enforced value of this limit is used to initialize msg_qbytes.
  59  *
  60  * Control:      process.max-msg-messages (rc_process_msgtql)
  61  * Description:  Maximum number of messages on a message queue.
  62  *
  63  *   When msgget() successfully allocates a message queue, the minimum
  64  *   enforced value of this limit is used to initialize a per-queue
  65  *   limit on the number of messages.
  66  */
  67 
  68 #include <sys/types.h>
  69 #include <sys/t_lock.h>
  70 #include <sys/param.h>
  71 #include <sys/cred.h>
  72 #include <sys/user.h>
  73 #include <sys/proc.h>
  74 #include <sys/time.h>
  75 #include <sys/ipc.h>
  76 #include <sys/ipc_impl.h>
  77 #include <sys/msg.h>
  78 #include <sys/msg_impl.h>
  79 #include <sys/list.h>
  80 #include <sys/systm.h>
  81 #include <sys/sysmacros.h>
  82 #include <sys/cpuvar.h>
  83 #include <sys/kmem.h>
  84 #include <sys/ddi.h>
  85 #include <sys/errno.h>
  86 #include <sys/cmn_err.h>
  87 #include <sys/debug.h>
  88 #include <sys/project.h>
  89 #include <sys/modctl.h>
  90 #include <sys/syscall.h>
  91 #include <sys/policy.h>
  92 #include <sys/zone.h>
  93 
  94 #include <c2/audit.h>
  95 
  96 /*
  97  * The following tunables are obsolete.  Though for compatibility we
  98  * still read and interpret msginfo_msgmnb, msginfo_msgmni, and
  99  * msginfo_msgtql (see os/project.c and os/rctl_proc.c), the preferred
 100  * mechanism for administrating the IPC Message facility is through the
 101  * resource controls described at the top of this file.
 102  */
 103 size_t  msginfo_msgmax = 2048;  /* (obsolete) */
 104 size_t  msginfo_msgmnb = 4096;  /* (obsolete) */
 105 int     msginfo_msgmni = 50;    /* (obsolete) */
 106 int     msginfo_msgtql = 40;    /* (obsolete) */
 107 int     msginfo_msgssz = 8;     /* (obsolete) */
 108 int     msginfo_msgmap = 0;     /* (obsolete) */
 109 ushort_t msginfo_msgseg = 1024; /* (obsolete) */
 110 
 111 extern rctl_hndl_t rc_zone_msgmni;
 112 extern rctl_hndl_t rc_project_msgmni;
 113 extern rctl_hndl_t rc_process_msgmnb;
 114 extern rctl_hndl_t rc_process_msgtql;
 115 static ipc_service_t *msq_svc;
 116 static zone_key_t msg_zone_key;
 117 
 118 static void msg_dtor(kipc_perm_t *);
 119 static void msg_rmid(kipc_perm_t *);
 120 static void msg_remove_zone(zoneid_t, void *);
 121 
 122 /*
 123  * Module linkage information for the kernel.
 124  */
 125 static ssize_t msgsys(int opcode, uintptr_t a0, uintptr_t a1, uintptr_t a2,
 126         uintptr_t a4, uintptr_t a5);
 127 
 128 static struct sysent ipcmsg_sysent = {
 129         6,
 130 #ifdef  _LP64
 131         SE_ARGC | SE_NOUNLOAD | SE_64RVAL,
 132 #else
 133         SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
 134 #endif
 135         (int (*)())msgsys
 136 };
 137 
 138 #ifdef  _SYSCALL32_IMPL
 139 static ssize32_t msgsys32(int opcode, uint32_t a0, uint32_t a1, uint32_t a2,
 140         uint32_t a4, uint32_t a5);
 141 
 142 static struct sysent ipcmsg_sysent32 = {
 143         6,
 144         SE_ARGC | SE_NOUNLOAD | SE_32RVAL1,
 145         (int (*)())msgsys32
 146 };
 147 #endif  /* _SYSCALL32_IMPL */
 148 
 149 static struct modlsys modlsys = {
 150         &mod_syscallops, "System V message facility", &ipcmsg_sysent
 151 };
 152 
 153 #ifdef _SYSCALL32_IMPL
 154 static struct modlsys modlsys32 = {
 155         &mod_syscallops32, "32-bit System V message facility", &ipcmsg_sysent32
 156 };
 157 #endif
 158 
 159 /*
 160  *      Big Theory statement for message queue correctness
 161  *
 162  * The msgrcv and msgsnd functions no longer uses cv_broadcast to wake up
 163  * receivers who are waiting for an event.  Using the cv_broadcast method
 164  * resulted in negative scaling when the number of waiting receivers are large
 165  * (the thundering herd problem).  Instead, the receivers waiting to receive a
 166  * message are now linked in a queue-like fashion and awaken one at a time in
 167  * a controlled manner.
 168  *
 169  * Receivers can block on two different classes of waiting list:
 170  *    1) "sendwait" list, which is the more complex list of the two.  The
 171  *        receiver will be awakened by a sender posting a new message.  There
 172  *        are two types of "sendwait" list used:
 173  *              a) msg_wait_snd: handles all receivers who are looking for
 174  *                 a message type >= 0, but was unable to locate a match.
 175  *
 176  *                 slot 0: reserved for receivers that have designated they
 177  *                         will take any message type.
 178  *                 rest:   consist of receivers requesting a specific type
 179  *                         but the type was not present.  The entries are
 180  *                         hashed into a bucket in an attempt to keep
 181  *                         any list search relatively short.
 182  *              b) msg_wait_snd_ngt: handles all receivers that have designated
 183  *                 a negative message type. Unlike msg_wait_snd, the hash bucket
 184  *                 serves a range of negative message types (-1 to -5, -6 to -10
 185  *                 and so forth), where the last bucket is reserved for all the
 186  *                 negative message types that hash outside of MSG_MAX_QNUM - 1.
 187  *                 This is done this way to simplify the operation of locating a
 188  *                 negative message type.
 189  *
 190  *    2) "copyout" list, where the receiver is awakened by another
 191  *       receiver after a message is copied out.  This is a linked list
 192  *       of waiters that are awakened one at a time.  Although the solution is
 193  *       not optimal, the complexity that would be added in for waking
 194  *       up the right entry far exceeds any potential pay back (too many
 195  *       correctness and corner case issues).
 196  *
 197  * The lists are doubly linked.  In the case of the "sendwait"
 198  * list, this allows the thread to remove itself from the list without having
 199  * to traverse the list.  In the case of the "copyout" list it simply allows
 200  * us to use common functions with the "sendwait" list.
 201  *
 202  * To make sure receivers are not hung out to dry, we must guarantee:
 203  *    1. If any queued message matches any receiver, then at least one
 204  *       matching receiver must be processing the request.
 205  *    2. Blocking on the copyout queue is only temporary while messages
 206  *       are being copied out.  The process is guaranted to wakeup
 207  *       when it gets to front of the queue (copyout is a FIFO).
 208  *
 209  * Rules for blocking and waking up:
 210  *   1. A receiver entering msgrcv must examine all messages for a match
 211  *      before blocking on a sendwait queue.
 212  *   2. If the receiver blocks because the message it chose is already
 213  *      being copied out, then when it wakes up needs to start start
 214  *      checking the messages from the beginning.
 215  *   3) When ever a process returns from msgrcv for any reason, if it
 216  *      had attempted to copy a message or blocked waiting for a copy
 217  *      to complete it needs to wakeup the next receiver blocked on
 218  *      a copy out.
 219  *   4) When a message is sent, the sender selects a process waiting
 220  *      for that type of message.  This selection process rotates between
 221  *      receivers types of 0, negative and positive to prevent starvation of
 222  *      any one particular receiver type.
 223  *   5) The following are the scenarios for processes that are awakened
 224  *      by a msgsnd:
 225  *              a) The process finds the message and is able to copy
 226  *                 it out.  Once complete, the process returns.
 227  *              b) The message that was sent that triggered the wakeup is no
 228  *                 longer available (another process found the message first).
 229  *                 We issue a wakeup on copy queue and then go back to
 230  *                 sleep waiting for another matching message to be sent.
 231  *              c) The message that was supposed to be processed was
 232  *                 already serviced by another process.  However a different
 233  *                 message is present which we can service.  The message
 234  *                 is copied and the process returns.
 235  *              d) The message is found, but some sort of error occurs that
 236  *                 prevents the message from being copied.  The receiver
 237  *                 wakes up the next sender that can service this message
 238  *                 type and returns an error to the caller.
 239  *              e) The message is found, but it is marked as being copied
 240  *                 out.  The receiver then goes to sleep on the copyout
 241  *                 queue where it will be awakened again sometime in the future.
 242  *
 243  *
 244  *   6) Whenever a message is found that matches the message type designated,
 245  *      but is being copied out we have to block on the copyout queue.
 246  *      After process copying finishes the copy out, it  must wakeup (either
 247  *      directly or indirectly) all receivers who blocked on its copyout,
 248  *      so they are guaranteed a chance to examine the remaining messages.
 249  *      This is implemented via a chain of wakeups: Y wakes X, who wakes Z,
 250  *      and so on.  The chain cannot be broken.  This leads to the following
 251  *      cases:
 252  *              a) A receiver is finished copying the message (or encountered)
 253  *                 an error), the first entry on the copyout queue is woken
 254  *                 up.
 255  *              b) When the receiver is woken up, it attempts to locate
 256  *                 a message type match.
 257  *              c) If a message type is found and
 258  *                      -- MSG_RCVCOPY flag is not set, the message is
 259  *                         marked for copying out.  Regardless of the copyout
 260  *                         success the next entry on the copyout queue is
 261  *                         awakened and the operation is completed.
 262  *                      -- MSG_RCVCOPY is set, we simply go back to sleep again
 263  *                         on the copyout queue.
 264  *              d) If the message type is not found then we wakeup the next
 265  *                 process on the copyout queue.
 266  *   7) If a msgsnd is unable to complete for of any of the following reasons
 267  *        a) the msgq has no space for the message
 268  *        b) the maximum number of messages allowed has been reached
 269  *      then one of two things happen:
 270  *        1) If the passed in msg_flag has IPC_NOWAIT set, then
 271  *           an error is returned.
 272  *        2) The IPC_NOWAIT bit is not set in msg_flag, then the
 273  *           the thread is placed to sleep until the request can be
 274  *           serviced.
 275  *   8) When waking a thread waiting to send a message, a check is done to
 276  *      verify that the operation being asked for by the thread will complete.
 277  *      This decision making process is done in a loop where the oldest request
 278  *      is checked first. The search will continue until there is no more
 279  *      room on the msgq or we have checked all the waiters.
 280  */
 281 
 282 static uint_t msg_type_hash(long);
 283 static int msgq_check_err(kmsqid_t *qp, int cvres);
 284 static int msg_rcvq_sleep(list_t *, msgq_wakeup_t *, kmutex_t **,
 285     kmsqid_t *);
 286 static int msg_copyout(kmsqid_t *, long, kmutex_t **, size_t *, size_t,
 287     struct msg *, struct ipcmsgbuf *, int);
 288 static void msg_rcvq_wakeup_all(list_t *);
 289 static void msg_wakeup_senders(kmsqid_t *);
 290 static void msg_wakeup_rdr(kmsqid_t *, msg_select_t **, long);
 291 static msgq_wakeup_t *msg_fnd_any_snd(kmsqid_t *, int, long);
 292 static msgq_wakeup_t *msg_fnd_any_rdr(kmsqid_t *, int, long);
 293 static msgq_wakeup_t *msg_fnd_neg_snd(kmsqid_t *, int, long);
 294 static msgq_wakeup_t *msg_fnd_spc_snd(kmsqid_t *, int, long);
 295 static struct msg *msgrcv_lookup(kmsqid_t *, long);
 296 
 297 msg_select_t msg_fnd_sndr[] = {
 298         { msg_fnd_any_snd, &msg_fnd_sndr[1] },
 299         { msg_fnd_spc_snd, &msg_fnd_sndr[2] },
 300         { msg_fnd_neg_snd, &msg_fnd_sndr[0] }
 301 };
 302 
 303 msg_select_t msg_fnd_rdr[1] = {
 304         { msg_fnd_any_rdr, &msg_fnd_rdr[0] },
 305 };
 306 
 307 static struct modlinkage modlinkage = {
 308         MODREV_1,
 309         {   &modlsys,
 310 #ifdef _SYSCALL32_IMPL
 311             &modlsys32,
 312 #endif
 313             NULL
 314         }
 315 };
 316 
 317 #define MSG_SMALL_INIT (size_t)-1
 318 int
 319 _init(void)
 320 {
 321         int result;
 322 
 323         msq_svc = ipcs_create("msqids", rc_project_msgmni, rc_zone_msgmni,
 324             sizeof (kmsqid_t), msg_dtor, msg_rmid, AT_IPC_MSG,
 325             offsetof(ipc_rqty_t, ipcq_msgmni));
 326         zone_key_create(&msg_zone_key, NULL, msg_remove_zone, NULL);
 327 
 328         if ((result = mod_install(&modlinkage)) == 0)
 329                 return (0);
 330 
 331         (void) zone_key_delete(msg_zone_key);
 332         ipcs_destroy(msq_svc);
 333 
 334         return (result);
 335 }
 336 
 337 int
 338 _fini(void)
 339 {
 340         return (EBUSY);
 341 }
 342 
 343 int
 344 _info(struct modinfo *modinfop)
 345 {
 346         return (mod_info(&modlinkage, modinfop));
 347 }
 348 
 349 static void
 350 msg_dtor(kipc_perm_t *perm)
 351 {
 352         kmsqid_t *qp = (kmsqid_t *)perm;
 353         int             ii;
 354 
 355         for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
 356                 ASSERT(list_is_empty(&qp->msg_wait_snd[ii]));
 357                 ASSERT(list_is_empty(&qp->msg_wait_snd_ngt[ii]));
 358                 list_destroy(&qp->msg_wait_snd[ii]);
 359                 list_destroy(&qp->msg_wait_snd_ngt[ii]);
 360         }
 361         ASSERT(list_is_empty(&qp->msg_cpy_block));
 362         ASSERT(list_is_empty(&qp->msg_wait_rcv));
 363         list_destroy(&qp->msg_cpy_block);
 364         ASSERT(qp->msg_snd_cnt == 0);
 365         ASSERT(qp->msg_cbytes == 0);
 366         list_destroy(&qp->msg_list);
 367         list_destroy(&qp->msg_wait_rcv);
 368 }
 369 
 370 
 371 #define msg_hold(mp)    (mp)->msg_copycnt++
 372 
 373 /*
 374  * msg_rele - decrement the reference count on the message.  When count
 375  * reaches zero, free message header and contents.
 376  */
 377 static void
 378 msg_rele(struct msg *mp)
 379 {
 380         ASSERT(mp->msg_copycnt > 0);
 381         if (mp->msg_copycnt-- == 1) {
 382                 if (mp->msg_addr)
 383                         kmem_free(mp->msg_addr, mp->msg_size);
 384                 kmem_free(mp, sizeof (struct msg));
 385         }
 386 }
 387 
 388 /*
 389  * msgunlink - Unlink msg from queue, decrement byte count and wake up anyone
 390  * waiting for free bytes on queue.
 391  *
 392  * Called with queue locked.
 393  */
 394 static void
 395 msgunlink(kmsqid_t *qp, struct msg *mp)
 396 {
 397         list_remove(&qp->msg_list, mp);
 398         qp->msg_qnum--;
 399         qp->msg_cbytes -= mp->msg_size;
 400         msg_rele(mp);
 401 
 402         /* Wake up waiting writers */
 403         msg_wakeup_senders(qp);
 404 }
 405 
 406 static void
 407 msg_rmid(kipc_perm_t *perm)
 408 {
 409         kmsqid_t *qp = (kmsqid_t *)perm;
 410         struct msg *mp;
 411         int             ii;
 412 
 413 
 414         while ((mp = list_head(&qp->msg_list)) != NULL)
 415                 msgunlink(qp, mp);
 416         ASSERT(qp->msg_cbytes == 0);
 417 
 418         /*
 419          * Wake up everyone who is in a wait state of some sort
 420          * for this message queue.
 421          */
 422         for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
 423                 msg_rcvq_wakeup_all(&qp->msg_wait_snd[ii]);
 424                 msg_rcvq_wakeup_all(&qp->msg_wait_snd_ngt[ii]);
 425         }
 426         msg_rcvq_wakeup_all(&qp->msg_cpy_block);
 427         msg_rcvq_wakeup_all(&qp->msg_wait_rcv);
 428 }
 429 
 430 /*
 431  * msgctl system call.
 432  *
 433  * gets q lock (via ipc_lookup), releases before return.
 434  * may call users of msg_lock
 435  */
 436 static int
 437 msgctl(int msgid, int cmd, void *arg)
 438 {
 439         STRUCT_DECL(msqid_ds, ds);              /* SVR4 queue work area */
 440         kmsqid_t                *qp;            /* ptr to associated q */
 441         int                     error;
 442         struct  cred            *cr;
 443         model_t mdl = get_udatamodel();
 444         struct msqid_ds64       ds64;
 445         kmutex_t                *lock;
 446         proc_t                  *pp = curproc;
 447 
 448         STRUCT_INIT(ds, mdl);
 449         cr = CRED();
 450 
 451         /*
 452          * Perform pre- or non-lookup actions (e.g. copyins, RMID).
 453          */
 454         switch (cmd) {
 455         case IPC_SET:
 456                 if (copyin(arg, STRUCT_BUF(ds), STRUCT_SIZE(ds)))
 457                         return (set_errno(EFAULT));
 458                 break;
 459 
 460         case IPC_SET64:
 461                 if (copyin(arg, &ds64, sizeof (struct msqid_ds64)))
 462                         return (set_errno(EFAULT));
 463                 break;
 464 
 465         case IPC_RMID:
 466                 if (error = ipc_rmid(msq_svc, msgid, cr))
 467                         return (set_errno(error));
 468                 return (0);
 469         }
 470 
 471         /*
 472          * get msqid_ds for this msgid
 473          */
 474         if ((lock = ipc_lookup(msq_svc, msgid, (kipc_perm_t **)&qp)) == NULL)
 475                 return (set_errno(EINVAL));
 476 
 477         switch (cmd) {
 478         case IPC_SET:
 479                 if (STRUCT_FGET(ds, msg_qbytes) > qp->msg_qbytes &&
 480                     secpolicy_ipc_config(cr) != 0) {
 481                         mutex_exit(lock);
 482                         return (set_errno(EPERM));
 483                 }
 484                 if (error = ipcperm_set(msq_svc, cr, &qp->msg_perm,
 485                     &STRUCT_BUF(ds)->msg_perm, mdl)) {
 486                         mutex_exit(lock);
 487                         return (set_errno(error));
 488                 }
 489                 qp->msg_qbytes = STRUCT_FGET(ds, msg_qbytes);
 490                 qp->msg_ctime = gethrestime_sec();
 491                 break;
 492 
 493         case IPC_STAT:
 494                 if (error = ipcperm_access(&qp->msg_perm, MSG_R, cr)) {
 495                         mutex_exit(lock);
 496                         return (set_errno(error));
 497                 }
 498 
 499                 if (qp->msg_rcv_cnt)
 500                         qp->msg_perm.ipc_mode |= MSG_RWAIT;
 501                 if (qp->msg_snd_cnt)
 502                         qp->msg_perm.ipc_mode |= MSG_WWAIT;
 503                 ipcperm_stat(&STRUCT_BUF(ds)->msg_perm, &qp->msg_perm, mdl);
 504                 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
 505                 STRUCT_FSETP(ds, msg_first, NULL);      /* kernel addr */
 506                 STRUCT_FSETP(ds, msg_last, NULL);
 507                 STRUCT_FSET(ds, msg_cbytes, qp->msg_cbytes);
 508                 STRUCT_FSET(ds, msg_qnum, qp->msg_qnum);
 509                 STRUCT_FSET(ds, msg_qbytes, qp->msg_qbytes);
 510                 STRUCT_FSET(ds, msg_lspid, qp->msg_lspid);
 511                 STRUCT_FSET(ds, msg_lrpid, qp->msg_lrpid);
 512                 STRUCT_FSET(ds, msg_stime, qp->msg_stime);
 513                 STRUCT_FSET(ds, msg_rtime, qp->msg_rtime);
 514                 STRUCT_FSET(ds, msg_ctime, qp->msg_ctime);
 515                 break;
 516 
 517         case IPC_SET64:
 518                 mutex_enter(&pp->p_lock);
 519                 if ((ds64.msgx_qbytes > qp->msg_qbytes) &&
 520                     secpolicy_ipc_config(cr) != 0 &&
 521                     rctl_test(rc_process_msgmnb, pp->p_rctls, pp,
 522                     ds64.msgx_qbytes, RCA_SAFE) & RCT_DENY) {
 523                         mutex_exit(&pp->p_lock);
 524                         mutex_exit(lock);
 525                         return (set_errno(EPERM));
 526                 }
 527                 mutex_exit(&pp->p_lock);
 528                 if (error = ipcperm_set64(msq_svc, cr, &qp->msg_perm,
 529                     &ds64.msgx_perm)) {
 530                         mutex_exit(lock);
 531                         return (set_errno(error));
 532                 }
 533                 qp->msg_qbytes = ds64.msgx_qbytes;
 534                 qp->msg_ctime = gethrestime_sec();
 535                 break;
 536 
 537         case IPC_STAT64:
 538                 if (qp->msg_rcv_cnt)
 539                         qp->msg_perm.ipc_mode |= MSG_RWAIT;
 540                 if (qp->msg_snd_cnt)
 541                         qp->msg_perm.ipc_mode |= MSG_WWAIT;
 542                 ipcperm_stat64(&ds64.msgx_perm, &qp->msg_perm);
 543                 qp->msg_perm.ipc_mode &= ~(MSG_RWAIT|MSG_WWAIT);
 544                 ds64.msgx_cbytes = qp->msg_cbytes;
 545                 ds64.msgx_qnum = qp->msg_qnum;
 546                 ds64.msgx_qbytes = qp->msg_qbytes;
 547                 ds64.msgx_lspid = qp->msg_lspid;
 548                 ds64.msgx_lrpid = qp->msg_lrpid;
 549                 ds64.msgx_stime = qp->msg_stime;
 550                 ds64.msgx_rtime = qp->msg_rtime;
 551                 ds64.msgx_ctime = qp->msg_ctime;
 552                 break;
 553 
 554         default:
 555                 mutex_exit(lock);
 556                 return (set_errno(EINVAL));
 557         }
 558 
 559         mutex_exit(lock);
 560 
 561         /*
 562          * Do copyout last (after releasing mutex).
 563          */
 564         switch (cmd) {
 565         case IPC_STAT:
 566                 if (copyout(STRUCT_BUF(ds), arg, STRUCT_SIZE(ds)))
 567                         return (set_errno(EFAULT));
 568                 break;
 569 
 570         case IPC_STAT64:
 571                 if (copyout(&ds64, arg, sizeof (struct msqid_ds64)))
 572                         return (set_errno(EFAULT));
 573                 break;
 574         }
 575 
 576         return (0);
 577 }
 578 
 579 /*
 580  * Remove all message queues associated with a given zone.  Called by
 581  * zone_shutdown when the zone is halted.
 582  */
 583 /*ARGSUSED1*/
 584 static void
 585 msg_remove_zone(zoneid_t zoneid, void *arg)
 586 {
 587         ipc_remove_zone(msq_svc, zoneid);
 588 }
 589 
 590 /*
 591  * msgget system call.
 592  */
 593 static int
 594 msgget(key_t key, int msgflg)
 595 {
 596         kmsqid_t        *qp;
 597         kmutex_t        *lock;
 598         int             id, error;
 599         int             ii;
 600         proc_t          *pp = curproc;
 601 
 602 top:
 603         if (error = ipc_get(msq_svc, key, msgflg, (kipc_perm_t **)&qp, &lock))
 604                 return (set_errno(error));
 605 
 606         if (IPC_FREE(&qp->msg_perm)) {
 607                 mutex_exit(lock);
 608                 mutex_exit(&pp->p_lock);
 609 
 610                 list_create(&qp->msg_list, sizeof (struct msg),
 611                     offsetof(struct msg, msg_node));
 612                 qp->msg_qnum = 0;
 613                 qp->msg_lspid = qp->msg_lrpid = 0;
 614                 qp->msg_stime = qp->msg_rtime = 0;
 615                 qp->msg_ctime = gethrestime_sec();
 616                 qp->msg_ngt_cnt = 0;
 617                 qp->msg_neg_copy = 0;
 618                 for (ii = 0; ii <= MSG_MAX_QNUM; ii++) {
 619                         list_create(&qp->msg_wait_snd[ii],
 620                             sizeof (msgq_wakeup_t),
 621                             offsetof(msgq_wakeup_t, msgw_list));
 622                         list_create(&qp->msg_wait_snd_ngt[ii],
 623                             sizeof (msgq_wakeup_t),
 624                             offsetof(msgq_wakeup_t, msgw_list));
 625                 }
 626                 /*
 627                  * The proper initialization of msg_lowest_type is to the
 628                  * highest possible value.  By doing this we guarantee that
 629                  * when the first send happens, the lowest type will be set
 630                  * properly.
 631                  */
 632                 qp->msg_lowest_type = MSG_SMALL_INIT;
 633                 list_create(&qp->msg_cpy_block,
 634                     sizeof (msgq_wakeup_t),
 635                     offsetof(msgq_wakeup_t, msgw_list));
 636                 list_create(&qp->msg_wait_rcv,
 637                     sizeof (msgq_wakeup_t),
 638                     offsetof(msgq_wakeup_t, msgw_list));
 639                 qp->msg_fnd_sndr = &msg_fnd_sndr[0];
 640                 qp->msg_fnd_rdr = &msg_fnd_rdr[0];
 641                 qp->msg_rcv_cnt = 0;
 642                 qp->msg_snd_cnt = 0;
 643                 qp->msg_snd_smallest = MSG_SMALL_INIT;
 644 
 645                 if (error = ipc_commit_begin(msq_svc, key, msgflg,
 646                     (kipc_perm_t *)qp)) {
 647                         if (error == EAGAIN)
 648                                 goto top;
 649                         return (set_errno(error));
 650                 }
 651                 qp->msg_qbytes = rctl_enforced_value(rc_process_msgmnb,
 652                     pp->p_rctls, pp);
 653                 qp->msg_qmax = rctl_enforced_value(rc_process_msgtql,
 654                     pp->p_rctls, pp);
 655                 lock = ipc_commit_end(msq_svc, &qp->msg_perm);
 656         }
 657 
 658         if (AU_AUDITING())
 659                 audit_ipcget(AT_IPC_MSG, (void *)qp);
 660 
 661         id = qp->msg_perm.ipc_id;
 662         mutex_exit(lock);
 663         return (id);
 664 }
 665 
 666 static ssize_t
 667 msgrcv(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, long msgtyp, int msgflg)
 668 {
 669         struct msg      *smp;   /* ptr to best msg on q */
 670         kmsqid_t        *qp;    /* ptr to associated q */
 671         kmutex_t        *lock;
 672         size_t          xtsz;   /* transfer byte count */
 673         int             error = 0;
 674         int             cvres;
 675         uint_t          msg_hash;
 676         msgq_wakeup_t   msg_entry;
 677 
 678         CPU_STATS_ADDQ(CPU, sys, msg, 1);       /* bump msg send/rcv count */
 679 
 680         msg_hash = msg_type_hash(msgtyp);
 681         if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
 682                 return ((ssize_t)set_errno(EINVAL));
 683         }
 684         ipc_hold(msq_svc, (kipc_perm_t *)qp);
 685 
 686         if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
 687                 goto msgrcv_out;
 688         }
 689 
 690         /*
 691          * Various information (including the condvar_t) required for the
 692          * process to sleep is provided by it's stack.
 693          */
 694         msg_entry.msgw_thrd = curthread;
 695         msg_entry.msgw_snd_wake = 0;
 696         msg_entry.msgw_type = msgtyp;
 697 findmsg:
 698         smp = msgrcv_lookup(qp, msgtyp);
 699 
 700         if (smp) {
 701                 /*
 702                  * We found a possible message to copy out.
 703                  */
 704                 if ((smp->msg_flags & MSG_RCVCOPY) == 0) {
 705                         long t = msg_entry.msgw_snd_wake;
 706                         long copy_type = smp->msg_type;
 707 
 708                         /*
 709                          * It is available, attempt to copy it.
 710                          */
 711                         error = msg_copyout(qp, msgtyp, &lock, &xtsz, msgsz,
 712                             smp, msgp, msgflg);
 713 
 714                         /*
 715                          * It is possible to consume a different message
 716                          * type then what originally awakened for (negative
 717                          * types).  If this happens a check must be done to
 718                          * to determine if another receiver is available
 719                          * for the waking message type,  Failure to do this
 720                          * can result in a message on the queue that can be
 721                          * serviced by a sleeping receiver.
 722                          */
 723                         if (!error && t && (copy_type != t))
 724                                 msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, t);
 725 
 726                         /*
 727                          * Don't forget to wakeup a sleeper that blocked because
 728                          * we were copying things out.
 729                          */
 730                         msg_wakeup_rdr(qp, &qp->msg_fnd_rdr, 0);
 731                         goto msgrcv_out;
 732                 }
 733                 /*
 734                  * The selected message is being copied out, so block.  We do
 735                  * not need to wake the next person up on the msg_cpy_block list
 736                  * due to the fact some one is copying out and they will get
 737                  * things moving again once the copy is completed.
 738                  */
 739                 cvres = msg_rcvq_sleep(&qp->msg_cpy_block,
 740                     &msg_entry, &lock, qp);
 741                 error = msgq_check_err(qp, cvres);
 742                 if (error) {
 743                         goto msgrcv_out;
 744                 }
 745                 goto findmsg;
 746         }
 747         /*
 748          * There isn't a message to copy out that matches the designated
 749          * criteria.
 750          */
 751         if (msgflg & IPC_NOWAIT) {
 752                 error = ENOMSG;
 753                 goto msgrcv_out;
 754         }
 755         msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
 756 
 757         /*
 758          * Wait for new message.  We keep the negative and positive types
 759          * separate for performance reasons.
 760          */
 761         msg_entry.msgw_snd_wake = 0;
 762         if (msgtyp >= 0) {
 763                 cvres = msg_rcvq_sleep(&qp->msg_wait_snd[msg_hash],
 764                     &msg_entry, &lock, qp);
 765         } else {
 766                 qp->msg_ngt_cnt++;
 767                 cvres = msg_rcvq_sleep(&qp->msg_wait_snd_ngt[msg_hash],
 768                     &msg_entry, &lock, qp);
 769                 qp->msg_ngt_cnt--;
 770         }
 771 
 772         if (!(error = msgq_check_err(qp, cvres))) {
 773                 goto findmsg;
 774         }
 775 
 776 msgrcv_out:
 777         if (error) {
 778                 msg_wakeup_rdr(qp,  &qp->msg_fnd_rdr, 0);
 779                 if (msg_entry.msgw_snd_wake) {
 780                         msg_wakeup_rdr(qp, &qp->msg_fnd_sndr,
 781                             msg_entry.msgw_snd_wake);
 782                 }
 783                 ipc_rele(msq_svc, (kipc_perm_t *)qp);
 784                 return ((ssize_t)set_errno(error));
 785         }
 786         ipc_rele(msq_svc, (kipc_perm_t *)qp);
 787         return ((ssize_t)xtsz);
 788 }
 789 
 790 static int
 791 msgq_check_err(kmsqid_t *qp, int cvres)
 792 {
 793         if (IPC_FREE(&qp->msg_perm)) {
 794                 return (EIDRM);
 795         }
 796 
 797         if (cvres == 0) {
 798                 return (EINTR);
 799         }
 800 
 801         return (0);
 802 }
 803 
 804 static int
 805 msg_copyout(kmsqid_t *qp, long msgtyp, kmutex_t **lock, size_t *xtsz_ret,
 806     size_t msgsz, struct msg *smp, struct ipcmsgbuf *msgp, int msgflg)
 807 {
 808         size_t          xtsz;
 809         STRUCT_HANDLE(ipcmsgbuf, umsgp);
 810         model_t         mdl = get_udatamodel();
 811         int             copyerror = 0;
 812 
 813         STRUCT_SET_HANDLE(umsgp, mdl, msgp);
 814         if (msgsz < smp->msg_size) {
 815                 if ((msgflg & MSG_NOERROR) == 0) {
 816                         return (E2BIG);
 817                 } else {
 818                         xtsz = msgsz;
 819                 }
 820         } else {
 821                 xtsz = smp->msg_size;
 822         }
 823         *xtsz_ret = xtsz;
 824 
 825         /*
 826          * To prevent a DOS attack we mark the message as being
 827          * copied out and release mutex.  When the copy is completed
 828          * we need to acquire the mutex and make the appropriate updates.
 829          */
 830         ASSERT((smp->msg_flags & MSG_RCVCOPY) == 0);
 831         smp->msg_flags |= MSG_RCVCOPY;
 832         msg_hold(smp);
 833         if (msgtyp < 0) {
 834                 ASSERT(qp->msg_neg_copy == 0);
 835                 qp->msg_neg_copy = 1;
 836         }
 837         mutex_exit(*lock);
 838 
 839         if (mdl == DATAMODEL_NATIVE) {
 840                 copyerror = copyout(&smp->msg_type, msgp,
 841                     sizeof (smp->msg_type));
 842         } else {
 843                 /*
 844                  * 32-bit callers need an imploded msg type.
 845                  */
 846                 int32_t msg_type32 = smp->msg_type;
 847 
 848                 copyerror = copyout(&msg_type32, msgp,
 849                     sizeof (msg_type32));
 850         }
 851 
 852         if (copyerror == 0 && xtsz) {
 853                 copyerror = copyout(smp->msg_addr,
 854                     STRUCT_FADDR(umsgp, mtext), xtsz);
 855         }
 856 
 857         /*
 858          * Reclaim the mutex and make sure the message queue still exists.
 859          */
 860 
 861         *lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
 862         if (msgtyp < 0) {
 863                 qp->msg_neg_copy = 0;
 864         }
 865         ASSERT(smp->msg_flags & MSG_RCVCOPY);
 866         smp->msg_flags &= ~MSG_RCVCOPY;
 867         msg_rele(smp);
 868         if (IPC_FREE(&qp->msg_perm)) {
 869                 return (EIDRM);
 870         }
 871         if (copyerror) {
 872                 return (EFAULT);
 873         }
 874         qp->msg_lrpid = ttoproc(curthread)->p_pid;
 875         qp->msg_rtime = gethrestime_sec();
 876         msgunlink(qp, smp);
 877         return (0);
 878 }
 879 
 880 static struct msg *
 881 msgrcv_lookup(kmsqid_t *qp, long msgtyp)
 882 {
 883         struct msg              *smp = NULL;
 884         long                    qp_low;
 885         struct msg              *mp;    /* ptr to msg on q */
 886         long                    low_msgtype;
 887         static struct msg       neg_copy_smp;
 888 
 889         mp = list_head(&qp->msg_list);
 890         if (msgtyp == 0) {
 891                 smp = mp;
 892         } else {
 893                 qp_low = qp->msg_lowest_type;
 894                 if (msgtyp > 0) {
 895                         /*
 896                          * If our lowest possible message type is larger than
 897                          * the message type desired, then we know there is
 898                          * no entry present.
 899                          */
 900                         if (qp_low > msgtyp) {
 901                                 return (NULL);
 902                         }
 903 
 904                         for (; mp; mp = list_next(&qp->msg_list, mp)) {
 905                                 if (msgtyp == mp->msg_type) {
 906                                         smp = mp;
 907                                         break;
 908                                 }
 909                         }
 910                 } else {
 911                         /*
 912                          * We have kept track of the lowest possible message
 913                          * type on the send queue.  This allows us to terminate
 914                          * the search early if we find a message type of that
 915                          * type.  Note, the lowest type may not be the actual
 916                          * lowest value in the system, it is only guaranteed
 917                          * that there isn't a value lower than that.
 918                          */
 919                         low_msgtype = -msgtyp;
 920                         if (low_msgtype < qp_low) {
 921                                 return (NULL);
 922                         }
 923                         if (qp->msg_neg_copy) {
 924                                 neg_copy_smp.msg_flags = MSG_RCVCOPY;
 925                                 return (&neg_copy_smp);
 926                         }
 927                         for (; mp; mp = list_next(&qp->msg_list, mp)) {
 928                                 if (mp->msg_type <= low_msgtype &&
 929                                     !(smp && smp->msg_type <= mp->msg_type)) {
 930                                         smp = mp;
 931                                         low_msgtype = mp->msg_type;
 932                                         if (low_msgtype == qp_low) {
 933                                                 break;
 934                                         }
 935                                 }
 936                         }
 937                         if (smp) {
 938                                 /*
 939                                  * Update the lowest message type.
 940                                  */
 941                                 qp->msg_lowest_type = smp->msg_type;
 942                         }
 943                 }
 944         }
 945         return (smp);
 946 }
 947 
 948 /*
 949  * msgids system call.
 950  */
 951 static int
 952 msgids(int *buf, uint_t nids, uint_t *pnids)
 953 {
 954         int error;
 955 
 956         if (error = ipc_ids(msq_svc, buf, nids, pnids))
 957                 return (set_errno(error));
 958 
 959         return (0);
 960 }
 961 
 962 #define RND(x)          roundup((x), sizeof (size_t))
 963 #define RND32(x)        roundup((x), sizeof (size32_t))
 964 
 965 /*
 966  * msgsnap system call.
 967  */
 968 static int
 969 msgsnap(int msqid, caddr_t buf, size_t bufsz, long msgtyp)
 970 {
 971         struct msg      *mp;    /* ptr to msg on q */
 972         kmsqid_t        *qp;    /* ptr to associated q */
 973         kmutex_t        *lock;
 974         size_t          size;
 975         size_t          nmsg;
 976         struct msg      **snaplist;
 977         int             error, i;
 978         model_t         mdl = get_udatamodel();
 979         STRUCT_DECL(msgsnap_head, head);
 980         STRUCT_DECL(msgsnap_mhead, mhead);
 981 
 982         STRUCT_INIT(head, mdl);
 983         STRUCT_INIT(mhead, mdl);
 984 
 985         if (bufsz < STRUCT_SIZE(head))
 986                 return (set_errno(EINVAL));
 987 
 988         if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL)
 989                 return (set_errno(EINVAL));
 990 
 991         if (error = ipcperm_access(&qp->msg_perm, MSG_R, CRED())) {
 992                 mutex_exit(lock);
 993                 return (set_errno(error));
 994         }
 995         ipc_hold(msq_svc, (kipc_perm_t *)qp);
 996 
 997         /*
 998          * First compute the required buffer size and
 999          * the number of messages on the queue.
1000          */
1001         size = nmsg = 0;
1002         for (mp = list_head(&qp->msg_list); mp;
1003             mp = list_next(&qp->msg_list, mp)) {
1004                 if (msgtyp == 0 ||
1005                     (msgtyp > 0 && msgtyp == mp->msg_type) ||
1006                     (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1007                         nmsg++;
1008                         if (mdl == DATAMODEL_NATIVE)
1009                                 size += RND(mp->msg_size);
1010                         else
1011                                 size += RND32(mp->msg_size);
1012                 }
1013         }
1014 
1015         size += STRUCT_SIZE(head) + nmsg * STRUCT_SIZE(mhead);
1016         if (size > bufsz)
1017                 nmsg = 0;
1018 
1019         if (nmsg > 0) {
1020                 /*
1021                  * Mark the messages as being copied.
1022                  */
1023                 snaplist = (struct msg **)kmem_alloc(nmsg *
1024                     sizeof (struct msg *), KM_SLEEP);
1025                 i = 0;
1026                 for (mp = list_head(&qp->msg_list); mp;
1027                     mp = list_next(&qp->msg_list, mp)) {
1028                         if (msgtyp == 0 ||
1029                             (msgtyp > 0 && msgtyp == mp->msg_type) ||
1030                             (msgtyp < 0 && mp->msg_type <= -msgtyp)) {
1031                                 msg_hold(mp);
1032                                 snaplist[i] = mp;
1033                                 i++;
1034                         }
1035                 }
1036         }
1037         mutex_exit(lock);
1038 
1039         /*
1040          * Copy out the buffer header.
1041          */
1042         STRUCT_FSET(head, msgsnap_size, size);
1043         STRUCT_FSET(head, msgsnap_nmsg, nmsg);
1044         if (copyout(STRUCT_BUF(head), buf, STRUCT_SIZE(head)))
1045                 error = EFAULT;
1046 
1047         buf += STRUCT_SIZE(head);
1048 
1049         /*
1050          * Now copy out the messages one by one.
1051          */
1052         for (i = 0; i < nmsg; i++) {
1053                 mp = snaplist[i];
1054                 if (error == 0) {
1055                         STRUCT_FSET(mhead, msgsnap_mlen, mp->msg_size);
1056                         STRUCT_FSET(mhead, msgsnap_mtype, mp->msg_type);
1057                         if (copyout(STRUCT_BUF(mhead), buf, STRUCT_SIZE(mhead)))
1058                                 error = EFAULT;
1059                         buf += STRUCT_SIZE(mhead);
1060 
1061                         if (error == 0 &&
1062                             mp->msg_size != 0 &&
1063                             copyout(mp->msg_addr, buf, mp->msg_size))
1064                                 error = EFAULT;
1065                         if (mdl == DATAMODEL_NATIVE)
1066                                 buf += RND(mp->msg_size);
1067                         else
1068                                 buf += RND32(mp->msg_size);
1069                 }
1070                 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1071                 msg_rele(mp);
1072                 /* Check for msg q deleted or reallocated */
1073                 if (IPC_FREE(&qp->msg_perm))
1074                         error = EIDRM;
1075                 mutex_exit(lock);
1076         }
1077 
1078         (void) ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1079         ipc_rele(msq_svc, (kipc_perm_t *)qp);
1080 
1081         if (nmsg > 0)
1082                 kmem_free(snaplist, nmsg * sizeof (struct msg *));
1083 
1084         if (error)
1085                 return (set_errno(error));
1086         return (0);
1087 }
1088 
1089 #define MSG_PREALLOC_LIMIT 8192
1090 
1091 /*
1092  * msgsnd system call.
1093  */
1094 static int
1095 msgsnd(int msqid, struct ipcmsgbuf *msgp, size_t msgsz, int msgflg)
1096 {
1097         kmsqid_t        *qp;
1098         kmutex_t        *lock = NULL;
1099         struct msg      *mp = NULL;
1100         long            type;
1101         int             error = 0, wait_wakeup = 0;
1102         msgq_wakeup_t   msg_entry;
1103         model_t         mdl = get_udatamodel();
1104         STRUCT_HANDLE(ipcmsgbuf, umsgp);
1105 
1106         CPU_STATS_ADDQ(CPU, sys, msg, 1);       /* bump msg send/rcv count */
1107         STRUCT_SET_HANDLE(umsgp, mdl, msgp);
1108 
1109         if (mdl == DATAMODEL_NATIVE) {
1110                 if (copyin(msgp, &type, sizeof (type)))
1111                         return (set_errno(EFAULT));
1112         } else {
1113                 int32_t type32;
1114                 if (copyin(msgp, &type32, sizeof (type32)))
1115                         return (set_errno(EFAULT));
1116                 type = type32;
1117         }
1118 
1119         if (type < 1)
1120                 return (set_errno(EINVAL));
1121 
1122         /*
1123          * We want the value here large enough that most of the
1124          * the message operations will use the "lockless" path,
1125          * but small enough that a user can not reserve large
1126          * chunks of kernel memory unless they have a valid
1127          * reason to.
1128          */
1129         if (msgsz <= MSG_PREALLOC_LIMIT) {
1130                 /*
1131                  * We are small enough that we can afford to do the
1132                  * allocation now.  This saves dropping the lock
1133                  * and then reacquiring the lock.
1134                  */
1135                 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1136                 mp->msg_copycnt = 1;
1137                 mp->msg_size = msgsz;
1138                 if (msgsz) {
1139                         mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1140                         if (copyin(STRUCT_FADDR(umsgp, mtext),
1141                             mp->msg_addr, msgsz) == -1) {
1142                                 error = EFAULT;
1143                                 goto msgsnd_out;
1144                         }
1145                 }
1146         }
1147 
1148         if ((lock = ipc_lookup(msq_svc, msqid, (kipc_perm_t **)&qp)) == NULL) {
1149                 error = EINVAL;
1150                 goto msgsnd_out;
1151         }
1152 
1153         ipc_hold(msq_svc, (kipc_perm_t *)qp);
1154 
1155         if (msgsz > qp->msg_qbytes) {
1156                 error = EINVAL;
1157                 goto msgsnd_out;
1158         }
1159 
1160         if (error = ipcperm_access(&qp->msg_perm, MSG_W, CRED()))
1161                 goto msgsnd_out;
1162 
1163 top:
1164         /*
1165          * Allocate space on q, message header, & buffer space.
1166          */
1167         ASSERT(qp->msg_qnum <= qp->msg_qmax);
1168         while ((msgsz > qp->msg_qbytes - qp->msg_cbytes) ||
1169             (qp->msg_qnum == qp->msg_qmax)) {
1170                 int cvres;
1171 
1172                 if (msgflg & IPC_NOWAIT) {
1173                         error = EAGAIN;
1174                         goto msgsnd_out;
1175                 }
1176 
1177                 wait_wakeup = 0;
1178                 qp->msg_snd_cnt++;
1179                 msg_entry.msgw_snd_size = msgsz;
1180                 msg_entry.msgw_thrd = curthread;
1181                 msg_entry.msgw_type = type;
1182                 cv_init(&msg_entry.msgw_wake_cv, NULL, 0, NULL);
1183                 list_insert_tail(&qp->msg_wait_rcv, &msg_entry);
1184                 if (qp->msg_snd_smallest > msgsz)
1185                         qp->msg_snd_smallest = msgsz;
1186                 cvres = cv_wait_sig(&msg_entry.msgw_wake_cv, lock);
1187                 lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, lock);
1188                 qp->msg_snd_cnt--;
1189                 if (list_link_active(&msg_entry.msgw_list))
1190                         list_remove(&qp->msg_wait_rcv, &msg_entry);
1191                 if (error = msgq_check_err(qp, cvres)) {
1192                         goto msgsnd_out;
1193                 }
1194                 wait_wakeup = 1;
1195         }
1196 
1197         if (mp == NULL) {
1198                 int failure;
1199 
1200                 mutex_exit(lock);
1201                 ASSERT(msgsz > 0);
1202                 mp = kmem_zalloc(sizeof (struct msg), KM_SLEEP);
1203                 mp->msg_addr = kmem_alloc(msgsz, KM_SLEEP);
1204                 mp->msg_size = msgsz;
1205                 mp->msg_copycnt = 1;
1206 
1207                 failure = (copyin(STRUCT_FADDR(umsgp, mtext),
1208                     mp->msg_addr, msgsz) == -1);
1209                 lock = ipc_lock(msq_svc, qp->msg_perm.ipc_id);
1210                 if (IPC_FREE(&qp->msg_perm)) {
1211                         error = EIDRM;
1212                         goto msgsnd_out;
1213                 }
1214                 if (failure) {
1215                         error = EFAULT;
1216                         goto msgsnd_out;
1217                 }
1218                 goto top;
1219         }
1220 
1221         /*
1222          * Everything is available, put msg on q.
1223          */
1224         qp->msg_qnum++;
1225         qp->msg_cbytes += msgsz;
1226         qp->msg_lspid = curproc->p_pid;
1227         qp->msg_stime = gethrestime_sec();
1228         mp->msg_type = type;
1229         if (qp->msg_lowest_type > type)
1230                 qp->msg_lowest_type = type;
1231         list_insert_tail(&qp->msg_list, mp);
1232         /*
1233          * Get the proper receiver going.
1234          */
1235         msg_wakeup_rdr(qp, &qp->msg_fnd_sndr, type);
1236 
1237 msgsnd_out:
1238         /*
1239          * We were woken up from the send wait list, but an
1240          * an error occured on placing the message onto the
1241          * msg queue.  Given that, we need to do the wakeup
1242          * dance again.
1243          */
1244 
1245         if (wait_wakeup && error) {
1246                 msg_wakeup_senders(qp);
1247         }
1248         if (lock)
1249                 ipc_rele(msq_svc, (kipc_perm_t *)qp);   /* drops lock */
1250 
1251         if (error) {
1252                 if (mp)
1253                         msg_rele(mp);
1254                 return (set_errno(error));
1255         }
1256 
1257         return (0);
1258 }
1259 
1260 static void
1261 msg_wakeup_rdr(kmsqid_t *qp, msg_select_t **flist, long type)
1262 {
1263         msg_select_t    *walker = *flist;
1264         msgq_wakeup_t   *wakeup;
1265         uint_t          msg_hash;
1266 
1267         msg_hash = msg_type_hash(type);
1268 
1269         do {
1270                 wakeup = walker->selection(qp, msg_hash, type);
1271                 walker = walker->next_selection;
1272         } while (!wakeup && walker != *flist);
1273 
1274         *flist = (*flist)->next_selection;
1275         if (wakeup) {
1276                 if (type) {
1277                         wakeup->msgw_snd_wake = type;
1278                 }
1279                 cv_signal(&wakeup->msgw_wake_cv);
1280         }
1281 }
1282 
1283 static uint_t
1284 msg_type_hash(long msg_type)
1285 {
1286         if (msg_type < 0) {
1287                 long    hash = -msg_type / MSG_NEG_INTERVAL;
1288                 /*
1289                  * Negative message types are hashed over an
1290                  * interval.  Any message type that hashes
1291                  * beyond MSG_MAX_QNUM is automatically placed
1292                  * in the last bucket.
1293                  */
1294                 if (hash > MSG_MAX_QNUM)
1295                         hash = MSG_MAX_QNUM;
1296                 return (hash);
1297         }
1298 
1299         /*
1300          * 0 or positive message type.  The first bucket is reserved for
1301          * message receivers of type 0, the other buckets we hash into.
1302          */
1303         if (msg_type)
1304                 return (1 + (msg_type % MSG_MAX_QNUM));
1305         return (0);
1306 }
1307 
1308 /*
1309  * Routines to see if we have a receiver of type 0 either blocked waiting
1310  * for a message.  Simply return the first guy on the list.
1311  */
1312 
1313 static msgq_wakeup_t *
1314 /* ARGSUSED */
1315 msg_fnd_any_snd(kmsqid_t *qp, int msg_hash, long type)
1316 {
1317         msgq_wakeup_t   *walker;
1318 
1319         walker = list_head(&qp->msg_wait_snd[0]);
1320 
1321         if (walker)
1322                 list_remove(&qp->msg_wait_snd[0], walker);
1323         return (walker);
1324 }
1325 
1326 static msgq_wakeup_t *
1327 /* ARGSUSED */
1328 msg_fnd_any_rdr(kmsqid_t *qp, int msg_hash, long type)
1329 {
1330         msgq_wakeup_t   *walker;
1331 
1332         walker = list_head(&qp->msg_cpy_block);
1333         if (walker)
1334                 list_remove(&qp->msg_cpy_block, walker);
1335         return (walker);
1336 }
1337 
1338 static msgq_wakeup_t *
1339 msg_fnd_spc_snd(kmsqid_t *qp, int msg_hash, long type)
1340 {
1341         msgq_wakeup_t   *walker;
1342 
1343         walker = list_head(&qp->msg_wait_snd[msg_hash]);
1344 
1345         while (walker && walker->msgw_type != type)
1346                 walker = list_next(&qp->msg_wait_snd[msg_hash], walker);
1347         if (walker)
1348                 list_remove(&qp->msg_wait_snd[msg_hash], walker);
1349         return (walker);
1350 }
1351 
1352 /* ARGSUSED */
1353 static msgq_wakeup_t *
1354 msg_fnd_neg_snd(kmsqid_t *qp, int msg_hash, long type)
1355 {
1356         msgq_wakeup_t   *qptr;
1357         int             count;
1358         int             check_index;
1359         int             neg_index;
1360         int             nbuckets;
1361 
1362         if (!qp->msg_ngt_cnt) {
1363                 return (NULL);
1364         }
1365         neg_index = msg_type_hash(-type);
1366 
1367         /*
1368          * Check for a match among the negative type queues.  Any buckets
1369          * at neg_index or larger can match the type.  Use the last send
1370          * time to randomize the starting bucket to prevent starvation.
1371          * Search all buckets from neg_index to MSG_MAX_QNUM, starting
1372          * from the random starting point, and wrapping around after
1373          * MSG_MAX_QNUM.
1374          */
1375 
1376         nbuckets = MSG_MAX_QNUM - neg_index + 1;
1377         check_index = neg_index + (qp->msg_stime % nbuckets);
1378 
1379         for (count = nbuckets; count > 0; count--) {
1380                 qptr = list_head(&qp->msg_wait_snd_ngt[check_index]);
1381                 while (qptr) {
1382                         /*
1383                          * The lowest hash bucket may actually contain
1384                          * message types that are not valid for this
1385                          * request.  This can happen due to the fact that
1386                          * the message buckets actually contain a consecutive
1387                          * range of types.
1388                          */
1389                         if (-qptr->msgw_type >= type) {
1390                                 list_remove(&qp->msg_wait_snd_ngt[check_index],
1391                                     qptr);
1392                                 return (qptr);
1393                         }
1394                         qptr = list_next(&qp->msg_wait_snd_ngt[check_index],
1395                             qptr);
1396                 }
1397                 if (++check_index > MSG_MAX_QNUM) {
1398                         check_index = neg_index;
1399                 }
1400         }
1401         return (NULL);
1402 }
1403 
1404 static int
1405 msg_rcvq_sleep(list_t *queue, msgq_wakeup_t *entry, kmutex_t **lock,
1406     kmsqid_t *qp)
1407 {
1408         int             cvres;
1409 
1410         cv_init(&entry->msgw_wake_cv, NULL, 0, NULL);
1411 
1412         list_insert_tail(queue, entry);
1413 
1414         qp->msg_rcv_cnt++;
1415         cvres = cv_wait_sig(&entry->msgw_wake_cv, *lock);
1416         *lock = ipc_relock(msq_svc, qp->msg_perm.ipc_id, *lock);
1417         qp->msg_rcv_cnt--;
1418 
1419         if (list_link_active(&entry->msgw_list)) {
1420                 /*
1421                  * We woke up unexpectedly, remove ourself.
1422                  */
1423                 list_remove(queue, entry);
1424         }
1425 
1426         return (cvres);
1427 }
1428 
1429 static void
1430 msg_rcvq_wakeup_all(list_t *q_ptr)
1431 {
1432         msgq_wakeup_t   *q_walk;
1433 
1434         while (q_walk = list_head(q_ptr)) {
1435                 list_remove(q_ptr, q_walk);
1436                 cv_signal(&q_walk->msgw_wake_cv);
1437         }
1438 }
1439 
1440 /*
1441  * msgsys - System entry point for msgctl, msgget, msgrcv, and msgsnd
1442  * system calls.
1443  */
1444 static ssize_t
1445 msgsys(int opcode, uintptr_t a1, uintptr_t a2, uintptr_t a3,
1446         uintptr_t a4, uintptr_t a5)
1447 {
1448         ssize_t error;
1449 
1450         switch (opcode) {
1451         case MSGGET:
1452                 error = msgget((key_t)a1, (int)a2);
1453                 break;
1454         case MSGCTL:
1455                 error = msgctl((int)a1, (int)a2, (void *)a3);
1456                 break;
1457         case MSGRCV:
1458                 error = msgrcv((int)a1, (struct ipcmsgbuf *)a2,
1459                     (size_t)a3, (long)a4, (int)a5);
1460                 break;
1461         case MSGSND:
1462                 error = msgsnd((int)a1, (struct ipcmsgbuf *)a2,
1463                     (size_t)a3, (int)a4);
1464                 break;
1465         case MSGIDS:
1466                 error = msgids((int *)a1, (uint_t)a2, (uint_t *)a3);
1467                 break;
1468         case MSGSNAP:
1469                 error = msgsnap((int)a1, (caddr_t)a2, (size_t)a3, (long)a4);
1470                 break;
1471         default:
1472                 error = set_errno(EINVAL);
1473                 break;
1474         }
1475 
1476         return (error);
1477 }
1478 
1479 /*
1480  * Determine if a writer who is waiting can process its message.  If so
1481  * wake it up.
1482  */
1483 static void
1484 msg_wakeup_senders(kmsqid_t *qp)
1485 
1486 {
1487         struct msgq_wakeup *ptr, *optr;
1488         size_t avail, smallest;
1489         int msgs_out;
1490 
1491         /*
1492          * Is there a writer waiting, and if so, can it be serviced? If
1493          * not return back to the caller.
1494          */
1495         if (IPC_FREE(&qp->msg_perm) || qp->msg_qnum >= qp->msg_qmax)
1496                 return;
1497 
1498         avail = qp->msg_qbytes - qp->msg_cbytes;
1499         if (avail < qp->msg_snd_smallest)
1500                 return;
1501 
1502         ptr = list_head(&qp->msg_wait_rcv);
1503         if (ptr == NULL) {
1504                 qp->msg_snd_smallest = MSG_SMALL_INIT;
1505                 return;
1506         }
1507         optr = ptr;
1508 
1509         /*
1510          * smallest:    minimum message size of all queued writers
1511          *
1512          * avail:       amount of space left on the msgq
1513          *              if all the writers we have woken up are successful.
1514          *
1515          * msgs_out:    is the number of messages on the message queue if
1516          *              all the writers we have woken up are successful.
1517          */
1518 
1519         smallest = MSG_SMALL_INIT;
1520         msgs_out = qp->msg_qnum;
1521         while (ptr) {
1522                 ptr = list_next(&qp->msg_wait_rcv, ptr);
1523                 if (optr->msgw_snd_size <= avail) {
1524                         list_remove(&qp->msg_wait_rcv, optr);
1525                         avail -= optr->msgw_snd_size;
1526                         cv_signal(&optr->msgw_wake_cv);
1527                         msgs_out++;
1528                         if (msgs_out == qp->msg_qmax ||
1529                             avail < qp->msg_snd_smallest)
1530                                 break;
1531                 } else {
1532                         if (smallest > optr->msgw_snd_size)
1533                                 smallest = optr->msgw_snd_size;
1534                 }
1535                 optr = ptr;
1536         }
1537 
1538         /*
1539          * Reset the smallest message size if the entire list has been visited
1540          */
1541         if (ptr == NULL && smallest != MSG_SMALL_INIT)
1542                 qp->msg_snd_smallest = smallest;
1543 }
1544 
1545 #ifdef  _SYSCALL32_IMPL
1546 /*
1547  * msgsys32 - System entry point for msgctl, msgget, msgrcv, and msgsnd
1548  * system calls for 32-bit callers on LP64 kernel.
1549  */
1550 static ssize32_t
1551 msgsys32(int opcode, uint32_t a1, uint32_t a2, uint32_t a3,
1552         uint32_t a4, uint32_t a5)
1553 {
1554         ssize_t error;
1555 
1556         switch (opcode) {
1557         case MSGGET:
1558                 error = msgget((key_t)a1, (int)a2);
1559                 break;
1560         case MSGCTL:
1561                 error = msgctl((int)a1, (int)a2, (void *)(uintptr_t)a3);
1562                 break;
1563         case MSGRCV:
1564                 error = msgrcv((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1565                     (size_t)a3, (long)(int32_t)a4, (int)a5);
1566                 break;
1567         case MSGSND:
1568                 error = msgsnd((int)a1, (struct ipcmsgbuf *)(uintptr_t)a2,
1569                     (size_t)(int32_t)a3, (int)a4);
1570                 break;
1571         case MSGIDS:
1572                 error = msgids((int *)(uintptr_t)a1, (uint_t)a2,
1573                     (uint_t *)(uintptr_t)a3);
1574                 break;
1575         case MSGSNAP:
1576                 error = msgsnap((int)a1, (caddr_t)(uintptr_t)a2, (size_t)a3,
1577                     (long)(int32_t)a4);
1578                 break;
1579         default:
1580                 error = set_errno(EINVAL);
1581                 break;
1582         }
1583 
1584         return (error);
1585 }
1586 #endif  /* SYSCALL32_IMPL */