1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /*
  26  * Copyright (c) 2010, Intel Corporation.
  27  * All rights reserved.
  28  * Copyright 2018 Joyent, Inc.
  29  */
  30 
  31 #include <sys/types.h>
  32 #include <sys/param.h>
  33 #include <sys/t_lock.h>
  34 #include <sys/thread.h>
  35 #include <sys/cpuvar.h>
  36 #include <sys/x_call.h>
  37 #include <sys/xc_levels.h>
  38 #include <sys/cpu.h>
  39 #include <sys/psw.h>
  40 #include <sys/sunddi.h>
  41 #include <sys/debug.h>
  42 #include <sys/systm.h>
  43 #include <sys/archsystm.h>
  44 #include <sys/machsystm.h>
  45 #include <sys/mutex_impl.h>
  46 #include <sys/stack.h>
  47 #include <sys/promif.h>
  48 #include <sys/x86_archext.h>
  49 
  50 /*
  51  * Implementation for cross-processor calls via interprocessor interrupts
  52  *
  53  * This implementation uses a message passing architecture to allow multiple
  54  * concurrent cross calls to be in flight at any given time. We use the cmpxchg
  55  * instruction, aka atomic_cas_ptr(), to implement simple efficient work
  56  * queues for message passing between CPUs with almost no need for regular
  57  * locking.  See xc_extract() and xc_insert() below.
  58  *
  59  * The general idea is that initiating a cross call means putting a message
  60  * on a target(s) CPU's work queue. Any synchronization is handled by passing
  61  * the message back and forth between initiator and target(s).
  62  *
  63  * Every CPU has xc_work_cnt, which indicates it has messages to process.
  64  * This value is incremented as message traffic is initiated and decremented
  65  * with every message that finishes all processing.
  66  *
  67  * The code needs no mfence or other membar_*() calls. The uses of
  68  * atomic_cas_ptr(), atomic_cas_32() and atomic_dec_32() for the message
  69  * passing are implemented with LOCK prefix instructions which are
  70  * equivalent to mfence.
  71  *
  72  * One interesting aspect of this implmentation is that it allows 2 or more
  73  * CPUs to initiate cross calls to intersecting sets of CPUs at the same time.
  74  * The cross call processing by the CPUs will happen in any order with only
  75  * a guarantee, for xc_call() and xc_sync(), that an initiator won't return
  76  * from cross calls before all slaves have invoked the function.
  77  *
  78  * The reason for this asynchronous approach is to allow for fast global
  79  * TLB shootdowns. If all CPUs, say N, tried to do a global TLB invalidation
  80  * on a different Virtual Address at the same time. The old code required
  81  * N squared IPIs. With this method, depending on timing, it could happen
  82  * with just N IPIs.
  83  *
  84  * Here are the normal transitions for XC_MSG_* values in ->xc_command. A
  85  * transition of "->" happens in the slave cpu and "=>" happens in the master
  86  * cpu as the messages are passed back and forth.
  87  *
  88  * FREE => ASYNC ->                       DONE => FREE
  89  * FREE => CALL ->                        DONE => FREE
  90  * FREE => SYNC -> WAITING => RELEASED -> DONE => FREE
  91  *
  92  * The interesting one above is ASYNC. You might ask, why not go directly
  93  * to FREE, instead of DONE? If it did that, it might be possible to exhaust
  94  * the master's xc_free list if a master can generate ASYNC messages faster
  95  * then the slave can process them. That could be handled with more complicated
  96  * handling. However since nothing important uses ASYNC, I've not bothered.
  97  */
  98 
  99 /*
 100  * The default is to not enable collecting counts of IPI information, since
 101  * the updating of shared cachelines could cause excess bus traffic.
 102  */
 103 uint_t xc_collect_enable = 0;
 104 uint64_t xc_total_cnt = 0;      /* total #IPIs sent for cross calls */
 105 uint64_t xc_multi_cnt = 0;      /* # times we piggy backed on another IPI */
 106 
 107 /*
 108  * We allow for one high priority message at a time to happen in the system.
 109  * This is used for panic, kmdb, etc., so no locking is done.
 110  */
 111 static volatile cpuset_t xc_priority_set_store;
 112 static volatile ulong_t *xc_priority_set = CPUSET2BV(xc_priority_set_store);
 113 static xc_data_t xc_priority_data;
 114 
 115 /*
 116  * Decrement a CPU's work count
 117  */
 118 static void
 119 xc_decrement(struct machcpu *mcpu)
 120 {
 121         atomic_dec_32(&mcpu->xc_work_cnt);
 122 }
 123 
 124 /*
 125  * Increment a CPU's work count and return the old value
 126  */
 127 static int
 128 xc_increment(struct machcpu *mcpu)
 129 {
 130         int old;
 131         do {
 132                 old = mcpu->xc_work_cnt;
 133         } while (atomic_cas_32(&mcpu->xc_work_cnt, old, old + 1) != old);
 134         return (old);
 135 }
 136 
 137 /*
 138  * Put a message into a queue. The insertion is atomic no matter
 139  * how many different inserts/extracts to the same queue happen.
 140  */
 141 static void
 142 xc_insert(void *queue, xc_msg_t *msg)
 143 {
 144         xc_msg_t *old_head;
 145 
 146         /*
 147          * FREE messages should only ever be getting inserted into
 148          * the xc_master CPUs xc_free queue.
 149          */
 150         ASSERT(msg->xc_command != XC_MSG_FREE ||
 151             cpu[msg->xc_master] == NULL || /* possible only during init */
 152             queue == &cpu[msg->xc_master]->cpu_m.xc_free);
 153 
 154         do {
 155                 old_head = (xc_msg_t *)*(volatile xc_msg_t **)queue;
 156                 msg->xc_next = old_head;
 157         } while (atomic_cas_ptr(queue, old_head, msg) != old_head);
 158 }
 159 
 160 /*
 161  * Extract a message from a queue. The extraction is atomic only
 162  * when just one thread does extractions from the queue.
 163  * If the queue is empty, NULL is returned.
 164  */
 165 static xc_msg_t *
 166 xc_extract(xc_msg_t **queue)
 167 {
 168         xc_msg_t *old_head;
 169 
 170         do {
 171                 old_head = (xc_msg_t *)*(volatile xc_msg_t **)queue;
 172                 if (old_head == NULL)
 173                         return (old_head);
 174         } while (atomic_cas_ptr(queue, old_head, old_head->xc_next) !=
 175             old_head);
 176         old_head->xc_next = NULL;
 177         return (old_head);
 178 }
 179 
 180 /*
 181  * Extract the next message from the CPU's queue, and place the message in
 182  * .xc_curmsg.  The latter is solely to make debugging (and ::xcall) more
 183  * useful.
 184  */
 185 static xc_msg_t *
 186 xc_get(void)
 187 {
 188         struct machcpu *mcpup = &CPU->cpu_m;
 189         xc_msg_t *msg = xc_extract(&mcpup->xc_msgbox);
 190         mcpup->xc_curmsg = msg;
 191         return (msg);
 192 }
 193 
 194 /*
 195  * Initialize the machcpu fields used for cross calls
 196  */
 197 static uint_t xc_initialized = 0;
 198 
 199 void
 200 xc_init_cpu(struct cpu *cpup)
 201 {
 202         xc_msg_t *msg;
 203         int c;
 204 
 205         /*
 206          * Allocate message buffers for the new CPU.
 207          */
 208         for (c = 0; c < max_ncpus; ++c) {
 209                 if (plat_dr_support_cpu()) {
 210                         /*
 211                          * Allocate a message buffer for every CPU possible
 212                          * in system, including our own, and add them to our xc
 213                          * message queue.
 214                          */
 215                         msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
 216                         msg->xc_command = XC_MSG_FREE;
 217                         msg->xc_master = cpup->cpu_id;
 218                         xc_insert(&cpup->cpu_m.xc_free, msg);
 219                 } else if (cpu[c] != NULL && cpu[c] != cpup) {
 220                         /*
 221                          * Add a new message buffer to each existing CPU's free
 222                          * list, as well as one for my list for each of them.
 223                          * Note: cpu0 is statically inserted into cpu[] array,
 224                          * so need to check cpu[c] isn't cpup itself to avoid
 225                          * allocating extra message buffers for cpu0.
 226                          */
 227                         msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
 228                         msg->xc_command = XC_MSG_FREE;
 229                         msg->xc_master = c;
 230                         xc_insert(&cpu[c]->cpu_m.xc_free, msg);
 231 
 232                         msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
 233                         msg->xc_command = XC_MSG_FREE;
 234                         msg->xc_master = cpup->cpu_id;
 235                         xc_insert(&cpup->cpu_m.xc_free, msg);
 236                 }
 237         }
 238 
 239         if (!plat_dr_support_cpu()) {
 240                 /*
 241                  * Add one for self messages if CPU hotplug is disabled.
 242                  */
 243                 msg = kmem_zalloc(sizeof (*msg), KM_SLEEP);
 244                 msg->xc_command = XC_MSG_FREE;
 245                 msg->xc_master = cpup->cpu_id;
 246                 xc_insert(&cpup->cpu_m.xc_free, msg);
 247         }
 248 
 249         if (!xc_initialized)
 250                 xc_initialized = 1;
 251 }
 252 
 253 void
 254 xc_fini_cpu(struct cpu *cpup)
 255 {
 256         xc_msg_t *msg;
 257 
 258         ASSERT((cpup->cpu_flags & CPU_READY) == 0);
 259         ASSERT(cpup->cpu_m.xc_msgbox == NULL);
 260         ASSERT(cpup->cpu_m.xc_work_cnt == 0);
 261 
 262         while ((msg = xc_extract(&cpup->cpu_m.xc_free)) != NULL) {
 263                 kmem_free(msg, sizeof (*msg));
 264         }
 265 }
 266 
 267 #define XC_FLUSH_MAX_WAITS              1000
 268 
 269 /* Flush inflight message buffers. */
 270 int
 271 xc_flush_cpu(struct cpu *cpup)
 272 {
 273         int i;
 274 
 275         ASSERT((cpup->cpu_flags & CPU_READY) == 0);
 276 
 277         /*
 278          * Pause all working CPUs, which ensures that there's no CPU in
 279          * function xc_common().
 280          * This is used to work around a race condition window in xc_common()
 281          * between checking CPU_READY flag and increasing working item count.
 282          */
 283         pause_cpus(cpup, NULL);
 284         start_cpus();
 285 
 286         for (i = 0; i < XC_FLUSH_MAX_WAITS; i++) {
 287                 if (cpup->cpu_m.xc_work_cnt == 0) {
 288                         break;
 289                 }
 290                 DELAY(1);
 291         }
 292         for (; i < XC_FLUSH_MAX_WAITS; i++) {
 293                 if (!BT_TEST(xc_priority_set, cpup->cpu_id)) {
 294                         break;
 295                 }
 296                 DELAY(1);
 297         }
 298 
 299         return (i >= XC_FLUSH_MAX_WAITS ? ETIME : 0);
 300 }
 301 
 302 /*
 303  * X-call message processing routine. Note that this is used by both
 304  * senders and recipients of messages.
 305  *
 306  * We're protected against changing CPUs by either being in a high-priority
 307  * interrupt, having preemption disabled or by having a raised SPL.
 308  */
 309 /*ARGSUSED*/
 310 uint_t
 311 xc_serv(caddr_t arg1, caddr_t arg2)
 312 {
 313         struct machcpu *mcpup = &(CPU->cpu_m);
 314         xc_msg_t *msg;
 315         xc_data_t *data;
 316         xc_msg_t *xc_waiters = NULL;
 317         uint32_t num_waiting = 0;
 318         xc_func_t func;
 319         xc_arg_t a1;
 320         xc_arg_t a2;
 321         xc_arg_t a3;
 322         uint_t rc = DDI_INTR_UNCLAIMED;
 323 
 324         while (mcpup->xc_work_cnt != 0) {
 325                 rc = DDI_INTR_CLAIMED;
 326 
 327                 /*
 328                  * We may have to wait for a message to arrive.
 329                  */
 330                 for (msg = NULL; msg == NULL; msg = xc_get()) {
 331 
 332                         /*
 333                          * Alway check for and handle a priority message.
 334                          */
 335                         if (BT_TEST(xc_priority_set, CPU->cpu_id)) {
 336                                 func = xc_priority_data.xc_func;
 337                                 a1 = xc_priority_data.xc_a1;
 338                                 a2 = xc_priority_data.xc_a2;
 339                                 a3 = xc_priority_data.xc_a3;
 340                                 BT_ATOMIC_CLEAR(xc_priority_set, CPU->cpu_id);
 341                                 xc_decrement(mcpup);
 342                                 func(a1, a2, a3);
 343                                 if (mcpup->xc_work_cnt == 0)
 344                                         return (rc);
 345                         }
 346 
 347                         /*
 348                          * wait for a message to arrive
 349                          */
 350                         SMT_PAUSE();
 351                 }
 352 
 353 
 354                 /*
 355                  * process the message
 356                  */
 357                 switch (msg->xc_command) {
 358 
 359                 /*
 360                  * ASYNC gives back the message immediately, then we do the
 361                  * function and return with no more waiting.
 362                  */
 363                 case XC_MSG_ASYNC:
 364                         data = &cpu[msg->xc_master]->cpu_m.xc_data;
 365                         func = data->xc_func;
 366                         a1 = data->xc_a1;
 367                         a2 = data->xc_a2;
 368                         a3 = data->xc_a3;
 369                         msg->xc_command = XC_MSG_DONE;
 370                         xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
 371                         if (func != NULL)
 372                                 (void) (*func)(a1, a2, a3);
 373                         xc_decrement(mcpup);
 374                         break;
 375 
 376                 /*
 377                  * SYNC messages do the call, then send it back to the master
 378                  * in WAITING mode
 379                  */
 380                 case XC_MSG_SYNC:
 381                         data = &cpu[msg->xc_master]->cpu_m.xc_data;
 382                         if (data->xc_func != NULL)
 383                                 (void) (*data->xc_func)(data->xc_a1,
 384                                     data->xc_a2, data->xc_a3);
 385                         msg->xc_command = XC_MSG_WAITING;
 386                         xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
 387                         break;
 388 
 389                 /*
 390                  * WAITING messsages are collected by the master until all
 391                  * have arrived. Once all arrive, we release them back to
 392                  * the slaves
 393                  */
 394                 case XC_MSG_WAITING:
 395                         xc_insert(&xc_waiters, msg);
 396                         if (++num_waiting < mcpup->xc_wait_cnt)
 397                                 break;
 398                         while ((msg = xc_extract(&xc_waiters)) != NULL) {
 399                                 msg->xc_command = XC_MSG_RELEASED;
 400                                 xc_insert(&cpu[msg->xc_slave]->cpu_m.xc_msgbox,
 401                                     msg);
 402                                 --num_waiting;
 403                         }
 404                         if (num_waiting != 0)
 405                                 panic("wrong number waiting");
 406                         mcpup->xc_wait_cnt = 0;
 407                         break;
 408 
 409                 /*
 410                  * CALL messages do the function and then, like RELEASE,
 411                  * send the message is back to master as DONE.
 412                  */
 413                 case XC_MSG_CALL:
 414                         data = &cpu[msg->xc_master]->cpu_m.xc_data;
 415                         if (data->xc_func != NULL)
 416                                 (void) (*data->xc_func)(data->xc_a1,
 417                                     data->xc_a2, data->xc_a3);
 418                         /*FALLTHROUGH*/
 419                 case XC_MSG_RELEASED:
 420                         msg->xc_command = XC_MSG_DONE;
 421                         xc_insert(&cpu[msg->xc_master]->cpu_m.xc_msgbox, msg);
 422                         xc_decrement(mcpup);
 423                         break;
 424 
 425                 /*
 426                  * DONE means a slave has completely finished up.
 427                  * Once we collect all the DONE messages, we'll exit
 428                  * processing too.
 429                  */
 430                 case XC_MSG_DONE:
 431                         msg->xc_command = XC_MSG_FREE;
 432                         xc_insert(&mcpup->xc_free, msg);
 433                         xc_decrement(mcpup);
 434                         break;
 435 
 436                 case XC_MSG_FREE:
 437                         panic("free message 0x%p in msgbox", (void *)msg);
 438                         break;
 439 
 440                 default:
 441                         panic("bad message 0x%p in msgbox", (void *)msg);
 442                         break;
 443                 }
 444 
 445                 CPU->cpu_m.xc_curmsg = NULL;
 446         }
 447         return (rc);
 448 }
 449 
 450 /*
 451  * Initiate cross call processing.
 452  */
 453 static void
 454 xc_common(
 455         xc_func_t func,
 456         xc_arg_t arg1,
 457         xc_arg_t arg2,
 458         xc_arg_t arg3,
 459         ulong_t *set,
 460         uint_t command)
 461 {
 462         int c;
 463         struct cpu *cpup;
 464         xc_msg_t *msg;
 465         xc_data_t *data;
 466         int cnt;
 467         int save_spl;
 468 
 469         if (!xc_initialized) {
 470                 if (BT_TEST(set, CPU->cpu_id) && (CPU->cpu_flags & CPU_READY) &&
 471                     func != NULL)
 472                         (void) (*func)(arg1, arg2, arg3);
 473                 return;
 474         }
 475 
 476         save_spl = splr(ipltospl(XC_HI_PIL));
 477 
 478         /*
 479          * fill in cross call data
 480          */
 481         data = &CPU->cpu_m.xc_data;
 482         data->xc_func = func;
 483         data->xc_a1 = arg1;
 484         data->xc_a2 = arg2;
 485         data->xc_a3 = arg3;
 486 
 487         /*
 488          * Post messages to all CPUs involved that are CPU_READY
 489          */
 490         CPU->cpu_m.xc_wait_cnt = 0;
 491         for (c = 0; c < max_ncpus; ++c) {
 492                 if (!BT_TEST(set, c))
 493                         continue;
 494                 cpup = cpu[c];
 495                 if (cpup == NULL || !(cpup->cpu_flags & CPU_READY))
 496                         continue;
 497 
 498                 /*
 499                  * Fill out a new message.
 500                  */
 501                 msg = xc_extract(&CPU->cpu_m.xc_free);
 502                 if (msg == NULL)
 503                         panic("Ran out of free xc_msg_t's");
 504                 msg->xc_command = command;
 505                 if (msg->xc_master != CPU->cpu_id)
 506                         panic("msg %p has wrong xc_master", (void *)msg);
 507                 msg->xc_slave = c;
 508 
 509                 /*
 510                  * Increment my work count for all messages that I'll
 511                  * transition from DONE to FREE.
 512                  * Also remember how many XC_MSG_WAITINGs to look for
 513                  */
 514                 (void) xc_increment(&CPU->cpu_m);
 515                 if (command == XC_MSG_SYNC)
 516                         ++CPU->cpu_m.xc_wait_cnt;
 517 
 518                 /*
 519                  * Increment the target CPU work count then insert the message
 520                  * in the target msgbox. If I post the first bit of work
 521                  * for the target to do, send an IPI to the target CPU.
 522                  */
 523                 cnt = xc_increment(&cpup->cpu_m);
 524                 xc_insert(&cpup->cpu_m.xc_msgbox, msg);
 525                 if (cpup != CPU) {
 526                         if (cnt == 0) {
 527                                 CPU_STATS_ADDQ(CPU, sys, xcalls, 1);
 528                                 send_dirint(c, XC_HI_PIL);
 529                                 if (xc_collect_enable)
 530                                         ++xc_total_cnt;
 531                         } else if (xc_collect_enable) {
 532                                 ++xc_multi_cnt;
 533                         }
 534                 }
 535         }
 536 
 537         /*
 538          * Now drop into the message handler until all work is done
 539          */
 540         (void) xc_serv(NULL, NULL);
 541         splx(save_spl);
 542 }
 543 
 544 /*
 545  * Push out a priority cross call.
 546  */
 547 static void
 548 xc_priority_common(
 549         xc_func_t func,
 550         xc_arg_t arg1,
 551         xc_arg_t arg2,
 552         xc_arg_t arg3,
 553         ulong_t *set)
 554 {
 555         int i;
 556         int c;
 557         struct cpu *cpup;
 558 
 559         /*
 560          * Wait briefly for any previous xc_priority to have finished.
 561          */
 562         for (c = 0; c < max_ncpus; ++c) {
 563                 cpup = cpu[c];
 564                 if (cpup == NULL || !(cpup->cpu_flags & CPU_READY))
 565                         continue;
 566 
 567                 /*
 568                  * The value of 40000 here is from old kernel code. It
 569                  * really should be changed to some time based value, since
 570                  * under a hypervisor, there's no guarantee a remote CPU
 571                  * is even scheduled.
 572                  */
 573                 for (i = 0; BT_TEST(xc_priority_set, c) && i < 40000; ++i)
 574                         SMT_PAUSE();
 575 
 576                 /*
 577                  * Some CPU did not respond to a previous priority request. It's
 578                  * probably deadlocked with interrupts blocked or some such
 579                  * problem. We'll just erase the previous request - which was
 580                  * most likely a kmdb_enter that has already expired - and plow
 581                  * ahead.
 582                  */
 583                 if (BT_TEST(xc_priority_set, c)) {
 584                         BT_ATOMIC_CLEAR(xc_priority_set, c);
 585                         if (cpup->cpu_m.xc_work_cnt > 0)
 586                                 xc_decrement(&cpup->cpu_m);
 587                 }
 588         }
 589 
 590         /*
 591          * fill in cross call data
 592          */
 593         xc_priority_data.xc_func = func;
 594         xc_priority_data.xc_a1 = arg1;
 595         xc_priority_data.xc_a2 = arg2;
 596         xc_priority_data.xc_a3 = arg3;
 597 
 598         /*
 599          * Post messages to all CPUs involved that are CPU_READY
 600          * We'll always IPI, plus bang on the xc_msgbox for i86_mwait()
 601          */
 602         for (c = 0; c < max_ncpus; ++c) {
 603                 if (!BT_TEST(set, c))
 604                         continue;
 605                 cpup = cpu[c];
 606                 if (cpup == NULL || !(cpup->cpu_flags & CPU_READY) ||
 607                     cpup == CPU)
 608                         continue;
 609                 (void) xc_increment(&cpup->cpu_m);
 610                 BT_ATOMIC_SET(xc_priority_set, c);
 611                 send_dirint(c, XC_HI_PIL);
 612                 for (i = 0; i < 10; ++i) {
 613                         (void) atomic_cas_ptr(&cpup->cpu_m.xc_msgbox,
 614                             cpup->cpu_m.xc_msgbox, cpup->cpu_m.xc_msgbox);
 615                 }
 616         }
 617 }
 618 
 619 /*
 620  * Do cross call to all other CPUs with absolutely no waiting or handshaking.
 621  * This should only be used for extraordinary operations, like panic(), which
 622  * need to work, in some fashion, in a not completely functional system.
 623  * All other uses that want minimal waiting should use xc_call_nowait().
 624  */
 625 void
 626 xc_priority(
 627         xc_arg_t arg1,
 628         xc_arg_t arg2,
 629         xc_arg_t arg3,
 630         ulong_t *set,
 631         xc_func_t func)
 632 {
 633         extern int IGNORE_KERNEL_PREEMPTION;
 634         int save_spl = splr(ipltospl(XC_HI_PIL));
 635         int save_kernel_preemption = IGNORE_KERNEL_PREEMPTION;
 636 
 637         IGNORE_KERNEL_PREEMPTION = 1;
 638         xc_priority_common((xc_func_t)func, arg1, arg2, arg3, set);
 639         IGNORE_KERNEL_PREEMPTION = save_kernel_preemption;
 640         splx(save_spl);
 641 }
 642 
 643 /*
 644  * Wrapper for kmdb to capture other CPUs, causing them to enter the debugger.
 645  */
 646 void
 647 kdi_xc_others(int this_cpu, void (*func)(void))
 648 {
 649         extern int IGNORE_KERNEL_PREEMPTION;
 650         int save_kernel_preemption;
 651         cpuset_t set;
 652 
 653         if (!xc_initialized)
 654                 return;
 655 
 656         save_kernel_preemption = IGNORE_KERNEL_PREEMPTION;
 657         IGNORE_KERNEL_PREEMPTION = 1;
 658         CPUSET_ALL_BUT(set, this_cpu);
 659         xc_priority_common((xc_func_t)func, 0, 0, 0, CPUSET2BV(set));
 660         IGNORE_KERNEL_PREEMPTION = save_kernel_preemption;
 661 }
 662 
 663 
 664 
 665 /*
 666  * Invoke function on specified processors. Remotes may continue after
 667  * service with no waiting. xc_call_nowait() may return immediately too.
 668  */
 669 void
 670 xc_call_nowait(
 671         xc_arg_t arg1,
 672         xc_arg_t arg2,
 673         xc_arg_t arg3,
 674         ulong_t *set,
 675         xc_func_t func)
 676 {
 677         xc_common(func, arg1, arg2, arg3, set, XC_MSG_ASYNC);
 678 }
 679 
 680 /*
 681  * Invoke function on specified processors. Remotes may continue after
 682  * service with no waiting. xc_call() returns only after remotes have finished.
 683  */
 684 void
 685 xc_call(
 686         xc_arg_t arg1,
 687         xc_arg_t arg2,
 688         xc_arg_t arg3,
 689         ulong_t *set,
 690         xc_func_t func)
 691 {
 692         xc_common(func, arg1, arg2, arg3, set, XC_MSG_CALL);
 693 }
 694 
 695 /*
 696  * Invoke function on specified processors. Remotes wait until all have
 697  * finished. xc_sync() also waits until all remotes have finished.
 698  */
 699 void
 700 xc_sync(
 701         xc_arg_t arg1,
 702         xc_arg_t arg2,
 703         xc_arg_t arg3,
 704         ulong_t *set,
 705         xc_func_t func)
 706 {
 707         xc_common(func, arg1, arg2, arg3, set, XC_MSG_SYNC);
 708 }