1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 
  22 /*
  23  * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
  24  */
  25 
  26 #include <unistd.h>
  27 #include <sys/types.h>
  28 #include <sys/stat.h>
  29 #include <sys/statvfs.h>
  30 #include <sys/uadmin.h>
  31 #include <sys/resource.h>
  32 #include <fcntl.h>
  33 #include <stdio.h>
  34 #include <thread.h>
  35 #include <meta.h>
  36 #include <sdssc.h>
  37 #include <mdmn_changelog.h>
  38 #include "mdmn_subr.h"
  39 
  40 /*
  41  * This is the communication daemon for SVM Multi Node Disksets.
  42  * It runs on every node and provides the following rpc services:
  43  *  - mdmn_send_svc_2
  44  *  - mdmn_work_svc_2
  45  *  - mdmn_wakeup_initiator_svc_2
  46  *  - mdmn_wakeup_master_svc_2
  47  *  - mdmn_comm_lock_svc_2
  48  *  - mdmn_comm_unlock_svc_2
  49  *  - mdmn_comm_suspend_svc_2
  50  *  - mdmn_comm_resume_svc_2
  51  *  - mdmn_comm_reinit_set_svc_2
  52  * where send, lock, unlock and reinit are meant for external use,
  53  * work and the two wakeups are for internal use only.
  54  *
  55  * NOTE:
  56  * On every node only one of those xxx_2 functions can be active at the
  57  * same time because the daemon is single threaded.
  58  *
  59  * (not quite true, as mdmn_send_svc_2 and mdmn_work_svc_2 do thr_create()s
  60  * as part of their handlers, so those aspects are multi-threaded)
  61  *
  62  * In case an event occurs that has to be propagated to all the nodes...
  63  *
  64  * One node (the initiator)
  65  *      calls the libmeta function mdmn_send_message()
  66  *      This function calls the local daemon thru mdmn_send_svc_2.
  67  *
  68  * On the initiator:
  69  *      mdmn_send_svc_2()
  70  *          - starts a thread -> mdmn_send_to_work() and returns.
  71  *      mdmn_send_to_work()
  72  *          - sends this message over to the master of the diskset.
  73  *            This is done by calling mdmn_work_svc_2 on the master.
  74  *          - registers to the initiator_table
  75  *          - exits without doing a svc_sendreply() for the call to
  76  *            mdmn_send_svc_2. This means that call is blocked until somebody
  77  *            (see end of this comment) does a svc_sendreply().
  78  *            This means mdmn_send_message() does not yet return.
  79  *          - A timeout surveillance is started at this point.
  80  *            This means in case the master doesn't reply at all in an
  81  *            aproppriate time, an error condition is returned
  82  *            to the caller.
  83  *
  84  * On the master:
  85  *      mdmn_work_svc_2()
  86  *          - starts a thread -> mdmn_master_process_msg() and returns
  87  *      mdmn_master_process_msg()
  88  *          - logs the message to the change log
  89  *          - executes the message locally
  90  *          - flags the message in the change log
  91  *          - sends the message to mdmn_work_svc_2() on all the
  92  *            other nodes (slaves)
  93  *            after each call to mdmn_work_svc_2 the thread goes to sleep and
  94  *            will be woken up by mdmn_wakeup_master_svc_2() as soon as the
  95  *            slave node is done with this message.
  96  *          - In case the slave doesn't respond in a apropriate time, an error
  97  *            is assumed to ensure the master doesn't wait forever.
  98  *
  99  * On a slave:
 100  *      mdmn_work_svc_2()
 101  *          - starts a thread -> mdmn_slave_process_msg() and returns
 102  *      mdmn_slave_process_msg()
 103  *          - processes this message locally by calling the appropriate message
 104  *            handler, that creates some result.
 105  *          - sends that result thru a call to mdmn_wakeup_master_svc_2() to
 106  *            the master.
 107  *
 108  * Back on the master:
 109  *      mdmn_wakeup_master_svc_2()
 110  *          - stores the result into the master_table.
 111  *          - signals the mdmn_master_process_msg-thread.
 112  *          - returns
 113  *      mdmn_master_process_msg()
 114  *          - after getting the results from all nodes
 115  *          - sends them back to the initiating node thru a call to
 116  *            mdmn_wakeup_initiator_svc_2.
 117  *
 118  * Back on the initiator:
 119  *      mdmn_wakeup_initiator_svc_2()
 120  *          - calls svc_sendreply() which makes the call to mdmn_send_svc_2()
 121  *            return.
 122  *            which allows the initial mdmn_send_message() call to return.
 123  */
 124 
 125 FILE *commdout;         /* debug output for the commd */
 126 char *commdoutfile;     /* file name for the above output */
 127 /* want at least 10 MB free space when logging into a file */
 128 #define MIN_FS_SPACE    (10LL * 1024 * 1024)
 129 
 130 /*
 131  * Number of outstanding messages that were initiated by this node.
 132  * If zero, check_timeouts goes to sleep
 133  */
 134 uint_t  messages_on_their_way;
 135 mutex_t check_timeout_mutex;    /* need mutex to protect above */
 136 cond_t  check_timeout_cv;       /* trigger for check_timeouts */
 137 
 138 /* for printing out time stamps */
 139 hrtime_t __savetime;
 140 
 141 /* RPC clients for every set and every node and their protecting locks */
 142 CLIENT  *client[MD_MAXSETS][NNODES];
 143 rwlock_t client_rwlock[MD_MAXSETS];
 144 
 145 /* the descriptors of all possible sets and their protectors */
 146 struct md_set_desc *set_descriptor[MD_MAXSETS];
 147 rwlock_t set_desc_rwlock[MD_MAXSETS];
 148 
 149 /* the daemon to daemon communication has to timeout quickly */
 150 static struct timeval FOUR_SECS = { 4, 0 };
 151 
 152 /* These indicate if a set has already been setup */
 153 int md_mn_set_inited[MD_MAXSETS];
 154 
 155 /* For every set we have a message completion table and protecting mutexes */
 156 md_mn_mct_t *mct[MD_MAXSETS];
 157 mutex_t mct_mutex[MD_MAXSETS][MD_MN_NCLASSES];
 158 
 159 /* Stuff to describe the global status of the commd on one node */
 160 #define MD_CGS_INITED           0x0001
 161 #define MD_CGS_ABORTED          0x0002  /* return everything with MDMNE_ABORT */
 162 uint_t md_commd_global_state = 0;       /* No state when starting up */
 163 
 164 /*
 165  * Global verbosity level for the daemon
 166  */
 167 uint_t md_commd_global_verb;
 168 
 169 /*
 170  * libmeta doesn't like multiple threads in metaget_setdesc().
 171  * So we must protect access to it with a global lock
 172  */
 173 mutex_t get_setdesc_mutex;
 174 
 175 /*
 176  * Need a way to block single message types,
 177  * hence an array with a status for every message type
 178  */
 179 uint_t msgtype_lock_state[MD_MN_NMESSAGES];
 180 
 181 /* for reading in the config file */
 182 #define MAX_LINE_SIZE 1024
 183 
 184 extern char *commd_get_outfile(void);
 185 extern uint_t commd_get_verbosity(void);
 186 
 187 /*
 188  * mdmn_clnt_create is a helper function for meta_client_create_retry.  It
 189  * merely needs to call clnt_create_timed, and meta_client_create_retry
 190  * will take care of the rest.
 191  */
 192 /* ARGSUSED */
 193 static CLIENT *
 194 mdmn_clnt_create(char *ignore, void *data, struct timeval *time_out)
 195 {
 196         md_mnnode_desc  *node = (md_mnnode_desc *)data;
 197 
 198         return (clnt_create_timed(node->nd_priv_ic, MDMN_COMMD, TWO, "tcp",
 199             time_out));
 200 }
 201 
 202 #define FLUSH_DEBUGFILE() \
 203         if (commdout != (FILE *)NULL) { \
 204                 (void) fflush(commdout); \
 205                 (void) fsync(fileno(commdout)); \
 206         }
 207 
 208 static void
 209 panic_system(int nid, md_mn_msgtype_t type, int master_err, int master_exitval,
 210     md_mn_result_t *slave_result)
 211 {
 212         md_mn_commd_err_t       commd_err;
 213         md_error_t              mne = mdnullerror;
 214         char                    *msg_buf;
 215 
 216         msg_buf = (char *)calloc(MAXPATHLEN + 1, sizeof (char));
 217 
 218         FLUSH_DEBUGFILE();
 219 
 220         if (master_err != MDMNE_ACK) {
 221                 (void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC "
 222                     "fail on master when processing message type %d\n", type);
 223         } else if (slave_result == NULL) {
 224                 (void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: RPC fail "
 225                     "on node %d when processing message type %d\n", nid, type);
 226         } else {
 227                 (void) snprintf(msg_buf, MAXPATHLEN, "rpc.mdcommd: "
 228                     "Inconsistent return value from node %d when processing "
 229                     "message type %d. Master exitval = %d, "
 230                     "Slave exitval = %d\n", nid, type, master_exitval,
 231                     slave_result->mmr_exitval);
 232         }
 233         commd_err.size = strlen(msg_buf);
 234         commd_err.md_message = (uint64_t)(uintptr_t)&msg_buf[0];
 235 
 236         (void) metaioctl(MD_MN_COMMD_ERR, &commd_err, &mne, "rpc.mdcommd");
 237         (void) uadmin(A_DUMP, AD_BOOT, (uintptr_t)NULL);
 238 }
 239 
 240 static void
 241 flush_fcout()
 242 {
 243         struct statvfs64 vfsbuf;
 244         long long avail_bytes;
 245         int warned = 0;
 246 
 247         for (; ; ) {
 248                 (void) sleep(10);
 249                 /* No output file, nothing to do */
 250                 if (commdout == (FILE *)NULL)
 251                         continue;
 252 
 253                 /*
 254                  * stat the appropriate filesystem to check for available space.
 255                  */
 256                 if (statvfs64(commdoutfile, &vfsbuf)) {
 257                         continue;
 258                 }
 259 
 260                 avail_bytes = vfsbuf.f_frsize * vfsbuf.f_bavail;
 261                 /*
 262                  * If we don't have enough space, we print out a warning.
 263                  * And we drop the verbosity level to NULL
 264                  * In case the condtion doesn't go away, we don't repeat
 265                  * the warning.
 266                  */
 267                 if (avail_bytes < MIN_FS_SPACE) {
 268                         if (warned) {
 269                                 continue;
 270                         }
 271                         commd_debug(MD_MMV_SYSLOG,
 272                             "NOT enough space available for logging\n");
 273                         commd_debug(MD_MMV_SYSLOG,
 274                             "Have %lld bytes, need %lld bytes\n",
 275                             avail_bytes, MIN_FS_SPACE);
 276                         warned = 1;
 277                         md_commd_global_verb = MD_MMV_NULL;
 278                 } else {
 279                         warned = 0;
 280                 }
 281 
 282                 (void) fflush(commdout);
 283         }
 284 }
 285 
 286 /* safer version of clnt_destroy. If clnt is NULL don't do anything */
 287 #define mdmn_clnt_destroy(clnt) {       \
 288         if (clnt)                       \
 289                 clnt_destroy(clnt);     \
 290 }
 291 
 292 /*
 293  * Own version of svc_sendreply that checks the integrity of the transport
 294  * handle and so prevents us from core dumps in the real svc_sendreply()
 295  */
 296 void
 297 mdmn_svc_sendreply(SVCXPRT *transp, xdrproc_t xdr, caddr_t data)
 298 {
 299         if (SVC_STAT(transp) == XPRT_DIED) {
 300                 commd_debug(MD_MMV_MISC,
 301                     "mdmn_svc_sendreply: XPRT_DIED\n");
 302                 return;
 303         }
 304         (void) svc_sendreply(transp, xdr, data);
 305 }
 306 
 307 /*
 308  * timeout_initiator(set, class)
 309  *
 310  * Alas, I sent a message and didn't get a response back in aproppriate time.
 311  *
 312  * timeout_initiator() takes care for doing the needed svc_sendreply() to the
 313  * calling mdmn_send_message, so that guy doesn't wait forever
 314  * What is done here is pretty much the same as what is done in
 315  * wakeup initiator. The difference is that we cannot provide for any results,
 316  * of course and we set the comm_state to MDMNE_TIMEOUT.
 317  *
 318  * By doing so, mdmn_send_message can decide if a retry would make sense or not.
 319  * It's not our's to decide that here.
 320  */
 321 void
 322 timeout_initiator(set_t setno, md_mn_msgclass_t class)
 323 {
 324         SVCXPRT         *transp;
 325         md_mn_msgid_t   mid;
 326         md_mn_result_t *resultp;
 327 
 328         resultp = Zalloc(sizeof (md_mn_result_t));
 329         resultp->mmr_comm_state      = MDMNE_TIMEOUT;
 330 
 331         commd_debug(MD_MMV_MISC,
 332             "timeout_initiator set = %d, class = %d\n", setno, class);
 333 
 334         transp = mdmn_get_initiator_table_transp(setno, class);
 335         mdmn_get_initiator_table_id(setno, class, &mid);
 336 
 337         commd_debug(MD_MMV_MISC, "timeout_ini: (%d, 0x%llx-%d)\n",
 338             MSGID_ELEMS(mid));
 339         /*
 340          * Give the result the corresponding msgid from the failed message.
 341          */
 342         MSGID_COPY(&mid, &(resultp->mmr_msgid));
 343 
 344         /* return to mdmn_send_message() and let it deal with the situation */
 345         mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
 346 
 347         free(resultp);
 348         commd_debug(MD_MMV_MISC, "timeout_ini: sendreplied\n");
 349         svc_done(transp);
 350         mdmn_unregister_initiator_table(setno, class);
 351 }
 352 
 353 
 354 /*
 355  * check_timeouts - thread
 356  *
 357  * This implements a timeout surveillance for messages sent from the
 358  * initiator to the master.
 359  *
 360  * If a message is started, this thread is triggered thru
 361  * cond_signal(&check_timeout_cv) and we keep track of the numbers of
 362  * messages that are outstanding (messages_on_their_way).
 363  *
 364  * As long as there are messages on their way, this thread never goes to sleep.
 365  * It'll keep checking all class/set combinations for outstanding messages.
 366  * If one is found, it's checked if this message is overdue. In that case,
 367  * timeout_initiator() is called to wakeup the calling mdmn_send_message and
 368  * to clean up the mess.
 369  *
 370  * If the result from the master arrives later, this message is considered
 371  * to be unsolicited. And will be ignored.
 372  */
 373 
 374 void
 375 check_timeouts()
 376 {
 377         set_t                   setno;
 378         time_t                  now, then;
 379         mutex_t                 *mx;
 380         md_mn_msgclass_t        class;
 381 
 382         for (; ; ) {
 383                 now = time((time_t *)NULL);
 384                 for (setno = 1; setno < MD_MAXSETS; setno++) {
 385                         if (md_mn_set_inited[setno] != MDMN_SET_READY) {
 386                                 continue;
 387                         }
 388                         for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES;
 389                             class++) {
 390                                 mx = mdmn_get_initiator_table_mx(setno, class);
 391                                 (void) mutex_lock(mx);
 392 
 393                                 /* then is the registered time */
 394                                 then =
 395                                     mdmn_get_initiator_table_time(setno, class);
 396                                 if ((then != 0) && (now > then)) {
 397                                         timeout_initiator(setno, class);
 398                                 }
 399                                 (void) mutex_unlock(mx);
 400                         }
 401                 }
 402                 /* it's ok to check only once per second */
 403                 (void) sleep(1);
 404 
 405                 /* is there work to do? */
 406                 (void) mutex_lock(&check_timeout_mutex);
 407                 if (messages_on_their_way == 0) {
 408                         (void) cond_wait(&check_timeout_cv,
 409                             &check_timeout_mutex);
 410                 }
 411                 (void) mutex_unlock(&check_timeout_mutex);
 412         }
 413 }
 414 
 415 void
 416 setup_debug(void)
 417 {
 418         char    *tmp_dir;
 419 
 420         /* Read in the debug-controlling tokens from runtime.cf */
 421         md_commd_global_verb = commd_get_verbosity();
 422         /*
 423          * If the user didn't specify a verbosity level in runtime.cf
 424          * we can safely return here. As we don't intend to printout
 425          * debug messages, we don't need to check for the output file.
 426          */
 427         if (md_commd_global_verb == 0) {
 428                 return;
 429         }
 430 
 431         /* if commdout is non-NULL it is an open FILE, we'd better close it */
 432         if (commdout != (FILE *)NULL) {
 433                 (void) fclose(commdout);
 434         }
 435 
 436         commdoutfile = commd_get_outfile();
 437 
 438         /* setup the debug output */
 439         if (commdoutfile == (char *)NULL) {
 440                 /* if no valid file was specified, use the default */
 441                 commdoutfile = "/var/run/commd.out";
 442                 commdout = fopen(commdoutfile, "a");
 443         } else {
 444                 /* check if the directory exists and is writable */
 445                 tmp_dir = strdup(commdoutfile);
 446                 if ((access(dirname(tmp_dir), X_OK|W_OK)) ||
 447                     ((commdout = fopen(commdoutfile, "a")) == (FILE *)NULL)) {
 448                         syslog(LOG_ERR,
 449                             "Can't write to specified output file %s,\n"
 450                             "using /var/run/commd.out instead\n", commdoutfile);
 451                         free(commdoutfile);
 452                         commdoutfile = "/var/run/commd.out";
 453                         commdout = fopen(commdoutfile, "a");
 454                 }
 455                 free(tmp_dir);
 456         }
 457 
 458         if (commdout == (FILE *)NULL) {
 459                 syslog(LOG_ERR, "Can't write to debug output file %s\n",
 460                     commdoutfile);
 461         }
 462 }
 463 
 464 /*
 465  * mdmn_is_node_dead checks to see if a node is dead using
 466  * the SunCluster infrastructure which is a stable interface.
 467  * If unable to contact SunCuster the node is assumed to be alive.
 468  * Return values:
 469  *      1 - node is dead
 470  *      0 - node is alive
 471  */
 472 int
 473 mdmn_is_node_dead(md_mnnode_desc *node)
 474 {
 475         char    *fmt = "/usr/cluster/bin/scha_cluster_get -O NODESTATE_NODE ";
 476         char    *cmd;
 477         size_t  size;
 478         char    buf[10];
 479         FILE    *ptr;
 480         int     retval = 0;
 481 
 482         /* I know that I'm alive */
 483         if (strcmp(node->nd_nodename, mynode()) == 0)
 484                 return (retval);
 485 
 486         size = strlen(fmt) + strlen(node->nd_nodename) + 1;
 487         cmd = Zalloc(size);
 488         (void) strlcat(cmd, fmt, size);
 489         (void) strlcat(cmd, node->nd_nodename, size);
 490 
 491         if ((ptr = popen(cmd, "r")) != NULL) {
 492                 if (fgets(buf, sizeof (buf), ptr) != NULL) {
 493                         /* If scha_cluster_get returned DOWN - return dead */
 494                         if (strncmp(buf, "DOWN", 4) == 0)
 495                                 retval = 1;
 496                 }
 497                 (void) pclose(ptr);
 498         }
 499         Free(cmd);
 500         return (retval);
 501 }
 502 
 503 /*
 504  * global_init()
 505  *
 506  * Perform some global initializations.
 507  *
 508  * the following routines have to call this before operation can start:
 509  *  - mdmn_send_svc_2
 510  *  - mdmn_work_svc_2
 511  *  - mdmn_comm_lock_svc_2
 512  *  - mdmn_comm_unlock_svc_2
 513  *  - mdmn_comm_suspend_svc_2
 514  *  - mdmn_comm_resume_svc_2
 515  *  - mdmn_comm_reinit_set_svc_2
 516  *
 517  * This is a single threaded daemon, so it can only be in one of the above
 518  * routines at the same time.
 519  * This means, global_init() cannot be called more than once at the same time.
 520  * Hence, no lock is needed.
 521  */
 522 void
 523 global_init(void)
 524 {
 525         set_t                   set;
 526         md_mn_msgclass_t        class;
 527         struct sigaction        sighandler;
 528         time_t                  clock_val;
 529         struct rlimit           commd_limit;
 530 
 531 
 532 
 533         /* Do these global initializations only once */
 534         if (md_commd_global_state & MD_CGS_INITED) {
 535                 return;
 536         }
 537         (void) sdssc_bind_library();
 538 
 539         /* setup the debug options from the config file */
 540         setup_debug();
 541 
 542         /* make sure that we don't run out of file descriptors */
 543         commd_limit.rlim_cur = commd_limit.rlim_max = RLIM_INFINITY;
 544         if (setrlimit(RLIMIT_NOFILE, &commd_limit) != 0) {
 545                 syslog(LOG_WARNING, gettext("setrlimit failed."
 546                     "Could not increase the max file descriptors"));
 547         }
 548 
 549         /* Make setup_debug() be the action in case of SIGHUP */
 550         sighandler.sa_flags = 0;
 551         (void) sigfillset(&sighandler.sa_mask);
 552         sighandler.sa_handler = (void (*)(int)) setup_debug;
 553         (void) sigaction(SIGHUP, &sighandler, NULL);
 554 
 555         __savetime = gethrtime();
 556         (void) time(&clock_val);
 557         commd_debug(MD_MMV_MISC, "global init called %s\n", ctime(&clock_val));
 558 
 559         /* start a thread that flushes out the debug on a regular basis */
 560         (void) thr_create(NULL, 0, (void *(*)(void *))flush_fcout,
 561             (void *) NULL, THR_DETACHED, NULL);
 562 
 563         /* global rwlock's / mutex's / cond_t's go here */
 564         (void) mutex_init(&check_timeout_mutex, USYNC_THREAD, NULL);
 565         (void) cond_init(&check_timeout_cv, USYNC_THREAD, NULL);
 566         (void) mutex_init(&get_setdesc_mutex, USYNC_THREAD, NULL);
 567 
 568         /* Make sure the initiator table is initialized correctly */
 569         for (set = 0; set < MD_MAXSETS; set++) {
 570                 for (class = 0; class < MD_MN_NCLASSES; class++) {
 571                         mdmn_unregister_initiator_table(set, class);
 572                 }
 573         }
 574 
 575 
 576         /* setup the check for timeouts */
 577         (void) thr_create(NULL, 0, (void *(*)(void *))check_timeouts,
 578             (void *) NULL, THR_DETACHED, NULL);
 579 
 580         md_commd_global_state |= MD_CGS_INITED;
 581 }
 582 
 583 
 584 /*
 585  * mdmn_init_client(setno, nodeid)
 586  * called if client[setno][nodeid] is NULL
 587  *
 588  * NOTE: Must be called with set_desc_rwlock held as a reader
 589  * NOTE: Must be called with client_rwlock held as a writer
 590  *
 591  * If the rpc client for this node has not been setup for any set, we do it now.
 592  *
 593  * Returns      0 on success (node found in set, rpc client setup)
 594  *              -1 if metaget_setdesc failed,
 595  *              -2 if node not part of set
 596  *              -3 if clnt_create fails
 597  */
 598 static int
 599 mdmn_init_client(set_t setno, md_mn_nodeid_t nid)
 600 {
 601         md_error_t      ep = mdnullerror;
 602         md_mnnode_desc  *node;
 603         md_set_desc     *sd;    /* just an abbr for set_descriptor[setno] */
 604 
 605         sd = set_descriptor[setno];
 606 
 607         /*
 608          * Is the appropriate set_descriptor already initialized ?
 609          * Can't think of a scenario where this is not the case, but we'd better
 610          * check for it anyway.
 611          */
 612         if (sd == NULL) {
 613                 mdsetname_t     *sp;
 614 
 615                 /* readlock -> writelock */
 616                 (void) rw_unlock(&set_desc_rwlock[setno]);
 617                 (void) rw_wrlock(&set_desc_rwlock[setno]);
 618                 sp = metasetnosetname(setno, &ep);
 619                 /* Only one thread is supposed to be in metaget_setdesc() */
 620                 (void) mutex_lock(&get_setdesc_mutex);
 621                 sd = metaget_setdesc(sp, &ep);
 622                 (void) mutex_unlock(&get_setdesc_mutex);
 623                 if (sd == NULL) {
 624                         /* back to ... */
 625                         (void) rw_unlock(&set_desc_rwlock[setno]);
 626                         /* ... readlock */
 627                         (void) rw_rdlock(&set_desc_rwlock[setno]);
 628                         return (-1);
 629                 }
 630                 set_descriptor[setno] = sd;
 631                 /* back to readlock */
 632                 (void) rw_unlock(&set_desc_rwlock[setno]);
 633                 (void) rw_rdlock(&set_desc_rwlock[setno]);
 634         }
 635 
 636         /* first we have to find the node name for this node id */
 637         for (node = sd->sd_nodelist; node; node = node->nd_next) {
 638                 if (node->nd_nodeid == nid)
 639                         break; /* we found our node in this set */
 640         }
 641 
 642 
 643         if (node == (md_mnnode_desc *)NULL) {
 644                 commd_debug(MD_MMV_SYSLOG,
 645                     "FATAL: node %d not found in set %d\n", nid, setno);
 646                 (void) rw_unlock(&set_desc_rwlock[setno]);
 647                 return (-2);
 648         }
 649 
 650         commd_debug(MD_MMV_INIT, "init: %s has the flags: 0x%x\n",
 651             node->nd_nodename ? node->nd_nodename : "NULL", node->nd_flags);
 652 
 653         /* Did this node join the diskset?  */
 654         if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
 655                 commd_debug(MD_MMV_INIT, "init: %s didn't join set %d\n",
 656                     node->nd_nodename ? node->nd_nodename : "NULL", setno);
 657                 (void) rw_unlock(&set_desc_rwlock[setno]);
 658                 return (-2);
 659         }
 660 
 661         /* if clnt_create has not been done for that node, do it now */
 662         if (client[setno][nid] == (CLIENT *) NULL) {
 663                 time_t  tout = 0;
 664 
 665                 /*
 666                  * While trying to create a connection to a node,
 667                  * periodically check to see if the node has been marked
 668                  * dead by the SunCluster infrastructure.
 669                  * This periodic check is needed since a non-responsive
 670                  * rpc.mdcommd (while it is attempting to create a connection
 671                  * to a dead node) can lead to large delays and/or failures
 672                  * in the reconfig steps.
 673                  */
 674                 while ((client[setno][nid] == (CLIENT *) NULL) &&
 675                     (tout < MD_CLNT_CREATE_TOUT)) {
 676                         client[setno][nid] = meta_client_create_retry(
 677                             node->nd_nodename, mdmn_clnt_create,
 678                             (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
 679                         /* Is the node dead? */
 680                         if (mdmn_is_node_dead(node) == 1) {
 681                                 commd_debug(MD_MMV_SYSLOG,
 682                                     "rpc.mdcommd: no client for dead node %s\n",
 683                                     node->nd_nodename);
 684                                 break;
 685                         } else
 686                                 tout += MD_CLNT_CREATE_SUBTIMEOUT;
 687                 }
 688 
 689                 if (client[setno][nid] == (CLIENT *) NULL) {
 690                         clnt_pcreateerror(node->nd_nodename);
 691                         (void) rw_unlock(&set_desc_rwlock[setno]);
 692                         return (-3);
 693                 }
 694                 /* this node has the license to send */
 695                 commd_debug(MD_MMV_MISC, "init_client: calling add_lic\n");
 696                 add_license(node);
 697 
 698                 /* set the timeout value */
 699                 clnt_control(client[setno][nid], CLSET_TIMEOUT,
 700                     (char *)&FOUR_SECS);
 701 
 702         }
 703         (void) rw_unlock(&set_desc_rwlock[setno]);
 704         return (0);
 705 }
 706 
 707 /*
 708  * check_client(setno, nodeid)
 709  *
 710  * must be called with reader lock held for set_desc_rwlock[setno]
 711  * and must be called with reader lock held for client_rwlock[setno]
 712  * Checks if the client for this set/node combination is already setup
 713  * if not it upgrades the lock to a writer lock
 714  * and tries to initialize the client.
 715  * Finally it's checked if the client nulled out again due to some race
 716  *
 717  * returns 0 if there is a usable client
 718  * returns MDMNE_RPC_FAIL otherwise
 719  */
 720 static int
 721 check_client(set_t setno, md_mn_nodeid_t nodeid)
 722 {
 723         int ret = 0;
 724 
 725         while ((client[setno][nodeid] == (CLIENT *)NULL) && (ret == 0)) {
 726                 /* upgrade reader ... */
 727                 (void) rw_unlock(&client_rwlock[setno]);
 728                 /* ... to writer lock. */
 729                 (void) rw_wrlock(&client_rwlock[setno]);
 730                 if (mdmn_init_client(setno, nodeid) != 0) {
 731                         ret = MDMNE_RPC_FAIL;
 732                 }
 733                 /* downgrade writer ... */
 734                 (void) rw_unlock(&client_rwlock[setno]);
 735                 /* ... back to reader lock. */
 736                 (void) rw_rdlock(&client_rwlock[setno]);
 737         }
 738         return (ret);
 739 }
 740 
 741 /*
 742  * mdmn_init_set(setno, todo)
 743  * setno is the number of the set to be initialized.
 744  * todo is one of the MDMN_SET_* thingies or MDMN_SET_READY
 745  * If called with MDMN_SET_READY everything is initialized.
 746  *
 747  * If the set mutexes are already initialized, the caller has to hold
 748  * both set_desc_rwlock[setno] and client_rwlock[setno] as a writer, before
 749  * calling mdmn_init_set()
 750  */
 751 int
 752 mdmn_init_set(set_t setno, int todo)
 753 {
 754         int class;
 755         md_mnnode_desc  *node;
 756         md_set_desc     *sd; /* just an abbr for set_descriptor[setno] */
 757         mdsetname_t     *sp;
 758         md_error_t      ep = mdnullerror;
 759         md_mn_nodeid_t  nid;
 760 
 761         /*
 762          * Check if we are told to setup the mutexes and
 763          * if these are not yet setup
 764          */
 765         if ((todo & MDMN_SET_MUTEXES) &&
 766             ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0)) {
 767                 (void) mutex_init(&mdmn_busy_mutex[setno], USYNC_THREAD, NULL);
 768                 (void) cond_init(&mdmn_busy_cv[setno], USYNC_THREAD, NULL);
 769                 (void) rwlock_init(&client_rwlock[setno], USYNC_THREAD, NULL);
 770                 (void) rwlock_init(&set_desc_rwlock[setno], USYNC_THREAD, NULL);
 771 
 772                 for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
 773                         (void) mutex_init(mdmn_get_master_table_mx(setno,
 774                             class), USYNC_THREAD, NULL);
 775                         (void) cond_init(mdmn_get_master_table_cv(setno, class),
 776                             USYNC_THREAD, NULL);
 777                         (void) mutex_init(mdmn_get_initiator_table_mx(setno,
 778                             class), USYNC_THREAD, NULL);
 779                 }
 780                 md_mn_set_inited[setno] |= MDMN_SET_MUTEXES;
 781         }
 782         if ((todo & MDMN_SET_MCT) &&
 783             ((md_mn_set_inited[setno] & MDMN_SET_MCT) == 0)) {
 784                 int     fd;
 785                 size_t  filesize;
 786                 caddr_t addr;
 787                 char table_name[32];
 788                 struct flock    fl;
 789 
 790                 filesize = (sizeof (md_mn_mct_t));
 791                 (void) snprintf(table_name, sizeof (table_name), "%s%d",
 792                     MD_MN_MSG_COMP_TABLE, setno);
 793                 /*
 794                  * If the mct file exists we map it into memory.
 795                  * Otherwise we create an empty file of appropriate
 796                  * size and map that into memory.
 797                  * The mapped areas are stored in mct[setno].
 798                  */
 799                 fd = open(table_name, O_RDWR|O_CREAT|O_DSYNC, 0600);
 800                 if (fd < 0) {
 801                         commd_debug(MD_MMV_MISC,
 802                             "init_set: Can't open MCT\n");
 803                         return (-1);
 804                 }
 805                 /*
 806                  * Ensure that we are the only process that has this file
 807                  * mapped. If another instance of rpc.mdcommd has beaten us
 808                  * then we display the failing process and attempt to terminate
 809                  * it. The next call of this routine should establish us as
 810                  * the only rpc.mdcommd on the system.
 811                  */
 812                 (void) memset(&fl, 0, sizeof (fl));
 813                 fl.l_type = F_WRLCK;
 814                 fl.l_whence = SEEK_SET;
 815                 fl.l_start = 0;
 816                 fl.l_len = filesize + 1;
 817 
 818                 if (fcntl(fd, F_SETLK, &fl) == -1) {
 819                         commd_debug(MD_MMV_SYSLOG,
 820                             "init_set: Cannot lock MCT '%s'\n", table_name);
 821                         if (fcntl(fd, F_GETLK, &fl) != -1) {
 822                                 commd_debug(MD_MMV_SYSLOG, "rpc.mdcommd:"
 823                                     "Process %d holds lock\n", fl.l_pid);
 824                                 (void) close(fd);
 825                         } else {
 826                                 commd_debug(MD_MMV_SYSLOG, "rpc.mdcommd:"
 827                                     "F_GETLK failed\n");
 828                                 (void) close(fd);
 829                                 return (-1);
 830                         }
 831 
 832                         /*
 833                          * Try to terminate other mdcommd process so that we
 834                          * can establish ourselves.
 835                          */
 836                         if (sigsend(P_PID, fl.l_pid, 0) == 0) {
 837                                 if (sigsend(P_PID, fl.l_pid, SIGKILL) < 0) {
 838                                         commd_debug(MD_MMV_SYSLOG,
 839                                             "rpc.mdcommd:"
 840                                             "SIGKILL of %d failed\n", fl.l_pid);
 841                                 } else {
 842                                         commd_debug(MD_MMV_SYSLOG,
 843                                             "rpc.mdcommd:"
 844                                             "Process %d killed\n", fl.l_pid);
 845                                 }
 846                         } else {
 847                                 commd_debug(MD_MMV_SYSLOG, "rpc.mdcommd:"
 848                                     "Process %d not killable\n", fl.l_pid);
 849                         }
 850                         return (-1);
 851                 }
 852                 /*
 853                  * To ensure that the file has the appropriate size,
 854                  * we write a byte at the end of the file.
 855                  */
 856                 (void) lseek(fd, filesize + 1, SEEK_SET);
 857                 (void) write(fd, "\0", 1);
 858 
 859                 /* at this point we have a file in place that we can mmap */
 860                 addr = mmap(0, filesize, PROT_READ | PROT_WRITE,
 861                     MAP_SHARED, fd, (off_t)0);
 862                 if (addr == MAP_FAILED) {
 863                         commd_debug(MD_MMV_INIT,
 864                             "init_set: mmap mct error %d\n",
 865                             errno);
 866                         return (-1);
 867                 }
 868                 /* LINTED pointer alignment */
 869                 mct[setno] = (md_mn_mct_t *)addr;
 870 
 871                 /* finally we initialize the mutexes that protect the mct */
 872                 for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
 873                         (void) mutex_init(&(mct_mutex[setno][class]),
 874                             USYNC_THREAD, NULL);
 875                 }
 876 
 877                 md_mn_set_inited[setno] |= MDMN_SET_MCT;
 878         }
 879         /*
 880          * Check if we are told to setup the nodes and
 881          * if these are not yet setup
 882          * (Attention: negative logic here compared to above!)
 883          */
 884         if (((todo & MDMN_SET_NODES) == 0) ||
 885             (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
 886                 return (0); /* success */
 887         }
 888 
 889         if ((sp = metasetnosetname(setno, &ep)) == NULL) {
 890                 commd_debug(MD_MMV_SYSLOG,
 891                     "metasetnosetname(%d) returned NULL\n", setno);
 892                 return (MDMNE_NOT_JOINED);
 893         }
 894 
 895         /* flush local copy of rpc.metad data */
 896         metaflushsetname(sp);
 897 
 898         (void) mutex_lock(&get_setdesc_mutex);
 899         sd = metaget_setdesc(sp, &ep);
 900         (void) mutex_unlock(&get_setdesc_mutex);
 901 
 902         if (sd == NULL) {
 903                 commd_debug(MD_MMV_SYSLOG,
 904                     "metaget_setdesc(%d) returned NULL\n", setno);
 905                 return (MDMNE_NOT_JOINED);
 906         }
 907 
 908         /*
 909          * if this set is not a multinode set or
 910          * this node didn't join yet the diskset, better don't do anything
 911          */
 912         if ((MD_MNSET_DESC(sd) == 0) ||
 913             (sd->sd_mn_mynode->nd_flags & MD_MN_NODE_OWN) == 0) {
 914                 commd_debug(MD_MMV_INIT, "didn't yet join set %d\n", setno);
 915                 return (MDMNE_NOT_JOINED);
 916         }
 917 
 918         for (node = sd->sd_nodelist; node != NULL; node = node->nd_next) {
 919                 time_t  tout = 0;
 920                 nid = node->nd_nodeid;
 921 
 922                 commd_debug(MD_MMV_INIT,
 923                     "setting up: node=%s, priv_ic=%s, flags=0x%x\n",
 924                     node->nd_nodename ? node->nd_nodename : "NULL",
 925                     node->nd_priv_ic ? node->nd_priv_ic : "NULL",
 926                     node->nd_flags);
 927 
 928                 if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
 929                         commd_debug(MD_MMV_INIT,
 930                             "init: %s didn't join set %d\n",
 931                             node->nd_nodename ? node->nd_nodename : "NULL",
 932                             setno);
 933                         continue;
 934                 }
 935 
 936                 if (client[setno][nid] != (CLIENT *) NULL) {
 937                         /* already inited */
 938                         commd_debug(MD_MMV_INIT, "init: already: node=%s\n",
 939                             node->nd_nodename ? node->nd_nodename : "NULL");
 940                         continue;
 941                 }
 942 
 943                 /*
 944                  * While trying to create a connection to a node,
 945                  * periodically check to see if the node has been marked
 946                  * dead by the SunCluster infrastructure.
 947                  * This periodic check is needed since a non-responsive
 948                  * rpc.mdcommd (while it is attempting to create a connection
 949                  * to a dead node) can lead to large delays and/or failures
 950                  * in the reconfig steps.
 951                  */
 952                 while ((client[setno][nid] == (CLIENT *) NULL) &&
 953                     (tout < MD_CLNT_CREATE_TOUT)) {
 954                         client[setno][nid] = meta_client_create_retry(
 955                             node->nd_nodename, mdmn_clnt_create,
 956                             (void *) node, MD_CLNT_CREATE_SUBTIMEOUT, &ep);
 957                         /* Is the node dead? */
 958                         if (mdmn_is_node_dead(node) == 1) {
 959                                 commd_debug(MD_MMV_SYSLOG,
 960                                     "rpc.mdcommd: no client for dead node %s\n",
 961                                     node->nd_nodename);
 962                                 break;
 963                         } else
 964                                 tout += MD_CLNT_CREATE_SUBTIMEOUT;
 965                 }
 966 
 967                 if (client[setno][nid] == (CLIENT *) NULL) {
 968                         clnt_pcreateerror(node->nd_nodename);
 969                         /*
 970                          * If we cannot connect to a single node
 971                          * (maybe because it is down) we mark this node as not
 972                          * owned and continue with the next node in the list.
 973                          * This is better than failing the entire starting up
 974                          * of the commd system.
 975                          */
 976                         node->nd_flags &= ~MD_MN_NODE_OWN;
 977                         commd_debug(MD_MMV_SYSLOG,
 978                             "WARNING couldn't create client for %s\n"
 979                             "Reconfig cycle required\n",
 980                             node->nd_nodename);
 981                         commd_debug(MD_MMV_INIT,
 982                             "WARNING couldn't create client for %s\n"
 983                             "Reconfig cycle required\n",
 984                             node->nd_nodename);
 985                         continue;
 986                 }
 987                 /* this node has the license to send */
 988                 commd_debug(MD_MMV_MISC, "init_set: calling add_lic\n");
 989                 add_license(node);
 990 
 991                 /* set the timeout value */
 992                 clnt_control(client[setno][nid], CLSET_TIMEOUT,
 993                     (char *)&FOUR_SECS);
 994 
 995                 commd_debug(MD_MMV_INIT, "init: done: node=%s\n",
 996                     node->nd_nodename ? node->nd_nodename : "NULL");
 997         }
 998 
 999         set_descriptor[setno] = sd;
1000         md_mn_set_inited[setno] |= MDMN_SET_NODES;
1001         return (0); /* success */
1002 }
1003 
1004 void *
1005 mdmn_send_to_work(void *arg)
1006 {
1007         int                     *rpc_err = NULL;
1008         int                     success;
1009         int                     try_master;
1010         set_t                   setno;
1011         mutex_t                 *mx;    /* protection for initiator_table */
1012         SVCXPRT                 *transp;
1013         md_mn_msg_t             *msg;
1014         md_mn_nodeid_t          set_master;
1015         md_mn_msgclass_t        class;
1016         md_mn_msg_and_transp_t  *matp = (md_mn_msg_and_transp_t *)arg;
1017 
1018         msg                     = matp->mat_msg;
1019         transp                  = matp->mat_transp;
1020 
1021         class = mdmn_get_message_class(msg->msg_type);
1022         setno = msg->msg_setno;
1023 
1024         /* set the sender, so the master knows who to send the results */
1025         (void) rw_rdlock(&set_desc_rwlock[setno]);
1026         msg->msg_sender = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
1027         set_master      = set_descriptor[setno]->sd_mn_master_nodeid;
1028 
1029         mx = mdmn_get_initiator_table_mx(setno, class);
1030         (void) mutex_lock(mx);
1031 
1032         /*
1033          * Here we check, if the initiator table slot for this set/class
1034          * combination is free to use.
1035          * If this is not the case, we return CLASS_BUSY forcing the
1036          * initiating send_message call to retry
1037          */
1038         success = mdmn_check_initiator_table(setno, class);
1039         if (success == MDMNE_CLASS_BUSY) {
1040                 md_mn_msgid_t           active_mid;
1041 
1042                 mdmn_get_initiator_table_id(setno, class, &active_mid);
1043 
1044                 commd_debug(MD_MMV_SEND,
1045                     "send_to_work: received but locally busy "
1046                     "(%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
1047                     "active msg=(%d, 0x%llx-%d)\n",
1048                     MSGID_ELEMS(msg->msg_msgid), setno, class,
1049                     msg->msg_type, MSGID_ELEMS(active_mid));
1050         } else {
1051                 commd_debug(MD_MMV_SEND,
1052                     "send_to_work: received (%d, 0x%llx-%d), "
1053                     "set=%d, class=%d, type=%d\n",
1054                     MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
1055         }
1056 
1057         try_master = 2; /* return failure after two retries */
1058         while ((success == MDMNE_ACK) && (try_master--)) {
1059                 (void) rw_rdlock(&client_rwlock[setno]);
1060                 /* is the rpc client to the master still around ? */
1061                 if (check_client(setno, set_master)) {
1062                         success = MDMNE_RPC_FAIL;
1063                         FLUSH_DEBUGFILE();
1064                         (void) rw_unlock(&client_rwlock[setno]);
1065                         break; /* out of try_master-loop */
1066                 }
1067 
1068                 /*
1069                  * Send the request to the work function on the master
1070                  * this call will return immediately
1071                  */
1072                 rpc_err = mdmn_work_2(msg, client[setno][set_master],
1073                     set_master);
1074 
1075                 /* Everything's Ok? */
1076                 if (rpc_err == NULL) {
1077                         success = MDMNE_RPC_FAIL;
1078                         /*
1079                          * Probably something happened to the daemon on the
1080                          * master. Kill the client, and try again...
1081                          */
1082                         (void) rw_unlock(&client_rwlock[setno]);
1083                         (void) rw_wrlock(&client_rwlock[setno]);
1084                         mdmn_clnt_destroy(client[setno][set_master]);
1085                         if (client[setno][set_master] != (CLIENT *)NULL) {
1086                                 client[setno][set_master] = (CLIENT *)NULL;
1087                         }
1088                         (void) rw_unlock(&client_rwlock[setno]);
1089                         continue;
1090 
1091                 } else  if (*rpc_err != MDMNE_ACK) {
1092                         /* something went wrong, break out */
1093                         success = *rpc_err;
1094                         free(rpc_err);
1095                         (void) rw_unlock(&client_rwlock[setno]);
1096                         break; /* out of try_master-loop */
1097                 }
1098 
1099                 (void) rw_unlock(&client_rwlock[setno]);
1100                 free(rpc_err);
1101 
1102                 /*
1103                  * If we are here, we sucessfully delivered the message.
1104                  * We register the initiator_table, so that
1105                  * wakeup_initiator_2 can do the sendreply with the
1106                  * results for us.
1107                  */
1108                 success = MDMNE_ACK;
1109                 mdmn_register_initiator_table(setno, class, msg, transp);
1110 
1111                 /* tell check_timeouts, there's work to do */
1112                 (void) mutex_lock(&check_timeout_mutex);
1113                 messages_on_their_way++;
1114                 (void) cond_signal(&check_timeout_cv);
1115                 (void) mutex_unlock(&check_timeout_mutex);
1116                 break; /* out of try_master-loop */
1117         }
1118 
1119         (void) rw_unlock(&set_desc_rwlock[setno]);
1120 
1121         if (success == MDMNE_ACK) {
1122                 commd_debug(MD_MMV_SEND,
1123                     "send_to_work: registered (%d, 0x%llx-%d)\n",
1124                     MSGID_ELEMS(msg->msg_msgid));
1125         } else {
1126                 /* In case of failure do the sendreply now */
1127                 md_mn_result_t *resultp;
1128                 resultp = Zalloc(sizeof (md_mn_result_t));
1129                 resultp->mmr_comm_state = success;
1130                 /*
1131                  * copy the MSGID so that we know _which_ message
1132                  * failed (if the transp has got mangled)
1133                  */
1134                 MSGID_COPY(&(msg->msg_msgid), &(resultp->mmr_msgid));
1135                 mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
1136                 commd_debug(MD_MMV_SEND,
1137                     "send_to_work: not registered (%d, 0x%llx-%d) cs=%d\n",
1138                     MSGID_ELEMS(msg->msg_msgid), success);
1139                 free_result(resultp);
1140                 /*
1141                  * We don't have a timeout registered to wake us up, so we're
1142                  * now done with this handle. Release it back to the pool.
1143                  */
1144                 svc_done(transp);
1145 
1146         }
1147 
1148         free_msg(msg);
1149         /* the alloc was done in mdmn_send_svc_2 */
1150         Free(matp);
1151         (void) mutex_unlock(mx);
1152         return (NULL);
1153 
1154 }
1155 
1156 /*
1157  * do_message_locally(msg, result)
1158  * Process a message locally on the master
1159  * Lookup the MCT if the message has already been processed.
1160  * If not, call the handler and store the result
1161  * If yes, retrieve the result from the MCT.
1162  * Return:
1163  *      MDMNE_ACK in case of success
1164  *      MDMNE_LOG_FAIL if the MCT could not be checked
1165  */
1166 static int
1167 do_message_locally(md_mn_msg_t *msg, md_mn_result_t *result)
1168 {
1169         int                     completed;
1170         set_t                   setno;
1171         md_mn_msgtype_t         msgtype = msg->msg_type;
1172         md_mn_msgclass_t        class;
1173 
1174         void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
1175 
1176         handler = mdmn_get_handler(msgtype);
1177         if (handler == NULL) {
1178                 result->mmr_exitval = 0;
1179                 /* let the sender decide if this is an error or not */
1180                 result->mmr_comm_state = MDMNE_NO_HANDLER;
1181                 return (MDMNE_NO_HANDLER);
1182         }
1183 
1184         class = mdmn_get_message_class(msg->msg_type);
1185         setno = msg->msg_setno;
1186 
1187         result->mmr_msgtype  = msgtype;
1188         result->mmr_flags    = msg->msg_flags;
1189         MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1190 
1191         (void) mutex_lock(&mct_mutex[setno][class]);
1192         completed = mdmn_check_completion(msg, result);
1193         if (completed == MDMN_MCT_NOT_DONE) {
1194                 /* message not yet processed locally */
1195                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1196                     "calling handler for (%d,0x%llx-%d) type %d\n",
1197                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1198 
1199                 /*
1200                  * Mark the message as being currently processed,
1201                  * so we won't start a second handler for it
1202                  */
1203                 (void) mdmn_mark_completion(msg, NULL, MDMN_MCT_IN_PROGRESS);
1204                 (void) mutex_unlock(&mct_mutex[setno][class]);
1205 
1206                 /* here we actually process the message on the master */
1207                 (*handler)(msg, MD_MSGF_ON_MASTER, result);
1208 
1209                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1210                     "finished handler for (%d,0x%llx-%d) type %d\n",
1211                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1212 
1213                 /* Mark the message as fully processed, store the result */
1214                 (void) mutex_lock(&mct_mutex[setno][class]);
1215                 (void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
1216         } else if (completed == MDMN_MCT_DONE) {
1217                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1218                     "result for (%d, 0x%llx-%d) from MCT\n",
1219                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1220         } else if (completed == MDMN_MCT_IN_PROGRESS) {
1221                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1222                     "(%d, 0x%llx-%d) is currently being processed\n",
1223                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1224         } else {
1225                 /* MCT error occurred (should never happen) */
1226                 (void) mutex_unlock(&mct_mutex[setno][class]);
1227                 result->mmr_comm_state = MDMNE_LOG_FAIL;
1228                 commd_debug(MD_MMV_SYSLOG, "WARNING "
1229                     "mdmn_check_completion returned %d "
1230                     "for (%d,0x%llx-%d)\n", completed,
1231                     MSGID_ELEMS(msg->msg_msgid));
1232                 return (MDMNE_LOG_FAIL);
1233         }
1234         (void) mutex_unlock(&mct_mutex[setno][class]);
1235         return (MDMNE_ACK);
1236 
1237 }
1238 
1239 /*
1240  * do_send_message(msg, node)
1241  *
1242  * Send a message to a given node and wait for a acknowledgment, that the
1243  * message has arrived on the remote node.
1244  * Make sure that the client for the set is setup correctly.
1245  * If no ACK arrives, destroy and recreate the RPC client and retry the
1246  * message one time
1247  * After actually sending wait no longer than the appropriate number of
1248  * before timing out the message.
1249  *
1250  * Note must be called with set_desc_wrlock held in reader mode
1251  */
1252 static int
1253 do_send_message(md_mn_msg_t *msg, md_mnnode_desc *node)
1254 {
1255         int                     err;
1256         int                     rpc_retries;
1257         int                     timeout_retries = 0;
1258         int                     *ret = NULL;
1259         set_t                   setno;
1260         cond_t                  *cv;    /* see mdmn_wakeup_master_svc_2 */
1261         mutex_t                 *mx;    /* protection for class_busy */
1262         timestruc_t             timeout; /* surveillance for remote daemon */
1263         md_mn_nodeid_t          nid;
1264         md_mn_msgtype_t         msgtype;
1265         md_mn_msgclass_t        class;
1266 
1267         nid     = node->nd_nodeid;
1268         msgtype = msg->msg_type;
1269         setno   = msg->msg_setno;
1270         class   = mdmn_get_message_class(msgtype);
1271         mx      = mdmn_get_master_table_mx(setno, class);
1272         cv      = mdmn_get_master_table_cv(setno, class);
1273 
1274 retry_rpc:
1275 
1276         /* We try two times to send the message */
1277         rpc_retries = 2;
1278 
1279         /*
1280          * if sending the message doesn't succeed the first time due to a
1281          * RPC problem, we retry one time
1282          */
1283         while ((rpc_retries != 0) && (ret == NULL)) {
1284                 /*  in abort state, we error out immediately */
1285                 if (md_commd_global_state & MD_CGS_ABORTED) {
1286                         return (MDMNE_ABORT);
1287                 }
1288 
1289                 (void) rw_rdlock(&client_rwlock[setno]);
1290                 /* unable to create client? Ignore it */
1291                 if (check_client(setno, nid)) {
1292                         /*
1293                          * In case we cannot establish an RPC client, we
1294                          * take this node out of our considerations.
1295                          * This will be reset by a reconfig
1296                          * cycle that should come pretty soon.
1297                          * MNISSUE: Should a reconfig cycle
1298                          * be forced on SunCluster?
1299                          */
1300                         node->nd_flags &= ~MD_MN_NODE_OWN;
1301                         commd_debug(MD_MMV_SYSLOG,
1302                             "WARNING couldn't create client for %s\n"
1303                             "Reconfig cycle required\n",
1304                             node->nd_nodename);
1305                         commd_debug(MD_MMV_PROC_M, "proc_mas: (%d,0x%llx-%d) "
1306                             "WARNING couldn't create client for %s\n",
1307                             MSGID_ELEMS(msg->msg_msgid), node->nd_nodename);
1308                         (void) rw_unlock(&client_rwlock[setno]);
1309                         return (MDMNE_IGNORE_NODE);
1310                 }
1311                 /* let's be paranoid and check again before sending */
1312                 if (client[setno][nid] == NULL) {
1313                         /*
1314                          * if this is true, strange enough, we catch our breath,
1315                          * and then continue, so that the client is set up
1316                          * once again.
1317                          */
1318                         commd_debug(MD_MMV_PROC_M, "client is NULL\n");
1319                         (void) rw_unlock(&client_rwlock[setno]);
1320                         (void) sleep(1);
1321                         continue;
1322                 }
1323 
1324                 /* send it over, it will return immediately */
1325                 ret = mdmn_work_2(msg, client[setno][nid], nid);
1326 
1327                 (void) rw_unlock(&client_rwlock[setno]);
1328 
1329                 if (ret != NULL) {
1330                         commd_debug(MD_MMV_PROC_M,
1331                             "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1332                             " 0x%x\n",
1333                             MSGID_ELEMS(msg->msg_msgid), nid, *ret);
1334                 } else {
1335                         commd_debug(MD_MMV_PROC_M,
1336                             "proc_mas: sending (%d,0x%llx-%d) to %d returned "
1337                             " NULL \n",
1338                             MSGID_ELEMS(msg->msg_msgid), nid);
1339                 }
1340 
1341                 if ((ret == NULL) || (*ret == MDMNE_CANNOT_CONNECT) ||
1342                     (*ret == MDMNE_THR_CREATE_FAIL)) {
1343                         /*
1344                          * Something happened to the daemon on the other side.
1345                          * Kill the client, and try again.
1346                          * check_client() will create a new client
1347                          */
1348                         (void) rw_wrlock(&client_rwlock[setno]);
1349                         mdmn_clnt_destroy(client[setno][nid]);
1350                         if (client[setno][nid] != (CLIENT *)NULL) {
1351                                 client[setno][nid] = (CLIENT *)NULL;
1352                         }
1353                         (void) rw_unlock(&client_rwlock[setno]);
1354 
1355                         /* ... but don't try infinitely */
1356                         --rpc_retries;
1357                         continue;
1358                 }
1359                 /*
1360                  * If the class is locked on the other node, keep trying.
1361                  * This situation will go away automatically,
1362                  * if we wait long enough
1363                  */
1364                 if (*ret == MDMNE_CLASS_LOCKED) {
1365                         (void) sleep(1);
1366                         free(ret);
1367                         ret = NULL;
1368                         continue;
1369                 }
1370         }
1371         if (ret == NULL) {
1372                 return (MDMNE_RPC_FAIL);
1373         }
1374 
1375 
1376         /* if the slave is in abort state, we just ignore it. */
1377         if (*ret == MDMNE_ABORT) {
1378                 commd_debug(MD_MMV_PROC_M,
1379                     "proc_mas: work(%d,0x%llx-%d) returned "
1380                     "MDMNE_ABORT\n",
1381                     MSGID_ELEMS(msg->msg_msgid));
1382                 free(ret);
1383                 return (MDMNE_IGNORE_NODE);
1384         }
1385 
1386         /* Did the remote processing succeed? */
1387         if (*ret != MDMNE_ACK) {
1388                 /*
1389                  * Some commd failure in the middle of sending the msg
1390                  * to the nodes. We don't continue here.
1391                  */
1392                 commd_debug(MD_MMV_PROC_M,
1393                     "proc_mas: work(%d,0x%llx-%d) returns %d\n",
1394                     MSGID_ELEMS(msg->msg_msgid), *ret);
1395                 free(ret);
1396                 return (MDMNE_RPC_FAIL);
1397         }
1398         free(ret);
1399         ret = NULL;
1400 
1401         /*
1402          * When we are here, we have sent the message to the other node and
1403          * we know that node has accepted it.
1404          * We go to sleep and have trust to be woken up by wakeup.
1405          * If we wakeup due to a timeout, or a signal, no result has been
1406          * placed in the appropriate slot.
1407          * If we timeout, it is likely that this is because the node has
1408          * gone away, so we will destroy the client and try it again in the
1409          * expectation that the rpc will fail and we will return
1410          * MDMNE_IGNORE_NODE. If that is not the case, the message must still
1411          * be being processed on the slave. In this case just timeout for 4
1412          * more seconds and then return RPC_FAIL if the message is not complete.
1413          */
1414         timeout.tv_nsec = 0;
1415         timeout.tv_sec = (timeout_retries == 0) ? mdmn_get_timeout(msgtype) :
1416             FOUR_SECS.tv_sec;
1417         err = cond_reltimedwait(cv, mx, &timeout);
1418 
1419         if (err == 0) {
1420                 /* everything's fine, return success */
1421                 return (MDMNE_ACK);
1422         }
1423 
1424         if (err == ETIME) {
1425                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1426                     "timeout occured, set=%d, class=%d, "
1427                     "msgid=(%d, 0x%llx-%d), timeout_retries=%d\n",
1428                     setno, class, MSGID_ELEMS(msg->msg_msgid), timeout_retries);
1429                 if (timeout_retries == 0) {
1430                         timeout_retries++;
1431                         /*
1432                          * Destroy the client and try the rpc call again
1433                          */
1434                         (void) rw_wrlock(&client_rwlock[setno]);
1435                         mdmn_clnt_destroy(client[setno][nid]);
1436                         client[setno][nid] = (CLIENT *)NULL;
1437                         (void) rw_unlock(&client_rwlock[setno]);
1438                         goto retry_rpc;
1439                 }
1440         } else if (err == EINTR) {
1441                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1442                     "commd signalled, set=%d, class=%d, "
1443                     "msgid=(%d, 0x%llx-%d)\n",
1444                     setno, class, MSGID_ELEMS(msg->msg_msgid));
1445         } else {
1446                 commd_debug(MD_MMV_PROC_M, "proc_mas: "
1447                     "cond_reltimedwait err=%d, set=%d, "
1448                     "class=%d, msgid=(%d, 0x%llx-%d)\n",
1449                     err, setno, class,
1450                     MSGID_ELEMS(msg->msg_msgid));
1451         }
1452 
1453         /* some failure happened */
1454         return (MDMNE_RPC_FAIL);
1455 }
1456 
1457 /*
1458  * before we return we have to
1459  * free_msg(msg); because we are working on a copied message
1460  */
1461 void
1462 mdmn_master_process_msg(md_mn_msg_t *msg)
1463 {
1464         int             *ret;
1465         int             err;
1466         int             nmsgs;          /* total number of msgs */
1467         int             curmsg;         /* index of current msg */
1468         set_t           setno;
1469         uint_t          inherit_flags = 0;
1470         uint_t          secdiff, usecdiff; /* runtime of this message */
1471         md_error_t      mde = mdnullerror;
1472         md_mn_msg_t     *msglist[MAX_SUBMESSAGES]; /* all msgs to process */
1473         md_mn_msg_t     *cmsg;          /* current msg */
1474         md_mn_msgid_t   dummyid;
1475         md_mn_result_t  *result;
1476         md_mn_result_t  *slave_result;
1477         md_mn_nodeid_t  sender;
1478         md_mn_nodeid_t  set_master;
1479         md_mnnode_desc  *node;
1480         md_mn_msgtype_t orig_type;      /* type of the original message */
1481         md_mn_msgtype_t msgtype;        /* type of the current message */
1482         md_mn_msgclass_t orig_class;    /* class of the original message */
1483         md_mn_msgclass_t class;         /* class of the current message */
1484 
1485         int (*smgen)(md_mn_msg_t *msg, md_mn_msg_t **msglist);
1486 
1487         orig_type = msgtype = msg->msg_type;
1488         sender  = msg->msg_sender;
1489         setno   = msg->msg_setno;
1490 
1491         result = Zalloc(sizeof (md_mn_result_t));
1492         result->mmr_setno    = setno;
1493         result->mmr_msgtype  = msgtype;
1494         MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
1495 
1496         orig_class = mdmn_get_message_class(msgtype);
1497 
1498         commd_debug(MD_MMV_PROC_M,
1499             "proc_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
1500             MSGID_ELEMS(msg->msg_msgid), setno, orig_class, msgtype);
1501 
1502         (void) rw_rdlock(&set_desc_rwlock[setno]);
1503         set_master = set_descriptor[setno]->sd_mn_master_nodeid;
1504         result->mmr_sender   = set_master;
1505         /*
1506          * Put message into the change log unless told otherwise
1507          * Note that we only log original messages.
1508          * If they are generated by some smgen, we don't log them!
1509          * Replay messages aren't logged either.
1510          * Note, that replay messages are unlogged on completion.
1511          */
1512         if ((msg->msg_flags & (MD_MSGF_NO_LOG | MD_MSGF_REPLAY_MSG)) == 0) {
1513                 commd_debug(MD_MMV_PROC_M,
1514                     "proc_mas: calling log_msg for (%d,0x%llx-%d) type %d\n",
1515                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1516                 err = mdmn_log_msg(msg);
1517                 if (err == MDMNE_NULL) {
1518                         /* msg logged successfully */
1519                         commd_debug(MD_MMV_PROC_M, "proc_mas: "
1520                             "done log_msg for (%d,0x%llx-%d) type %d\n",
1521                             MSGID_ELEMS(msg->msg_msgid), msgtype);
1522                         goto proceed;
1523                 }
1524                 if (err == MDMNE_ACK) {
1525                         /* Same msg in the slot, proceed */
1526                         commd_debug(MD_MMV_PROC_M, "proc_mas: "
1527                             "already logged (%d,0x%llx-%d) type %d\n",
1528                             MSGID_ELEMS(msg->msg_msgid), msgtype);
1529                         goto proceed;
1530                 }
1531                 if (err == MDMNE_LOG_FAIL) {
1532                         /* Oh, bad, the log is non functional. */
1533                         result->mmr_comm_state = MDMNE_LOG_FAIL;
1534                         /*
1535                          * Note that the mark_busy was already done by
1536                          * mdmn_work_svc_2()
1537                          */
1538                         (void) mutex_lock(&mdmn_busy_mutex[setno]);
1539                         mdmn_mark_class_unbusy(setno, orig_class);
1540                         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
1541 
1542                 }
1543                 if (err == MDMNE_CLASS_BUSY) {
1544                         /*
1545                          * The log is occupied with a different message
1546                          * that needs to be played first.
1547                          * We reject the current message with MDMNE_CLASS_BUSY
1548                          * to the initiator and do not unbusy the set/class,
1549                          * because we will proceed with the logged message,
1550                          * which has the same set/class combination
1551                          */
1552                         result->mmr_comm_state = MDMNE_CLASS_BUSY;
1553                 }
1554                 ret = (int *)NULL;
1555                 (void) rw_rdlock(&client_rwlock[setno]);
1556 
1557                 if (check_client(setno, sender)) {
1558                         commd_debug(MD_MMV_SYSLOG,
1559                             "proc_mas: No client for initiator \n");
1560                 } else {
1561                         ret = mdmn_wakeup_initiator_2(result,
1562                             client[setno][sender], sender);
1563                 }
1564                 (void) rw_unlock(&client_rwlock[setno]);
1565 
1566                 if (ret == (int *)NULL) {
1567                         commd_debug(MD_MMV_SYSLOG,
1568                             "proc_mas: couldn't wakeup_initiator \n");
1569                 } else {
1570                         if (*ret != MDMNE_ACK) {
1571                                 commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1572                                     "wakeup_initiator returned %d\n", *ret);
1573                         }
1574                         free(ret);
1575                 }
1576                 free_msg(msg);
1577 
1578                 if (err == MDMNE_LOG_FAIL) {
1579                         /* we can't proceed here */
1580                         free_result(result);
1581                         (void) rw_unlock(&set_desc_rwlock[setno]);
1582                         return;
1583                 } else if (err == MDMNE_CLASS_BUSY) {
1584                         mdmn_changelog_record_t *lr;
1585                         lr = mdmn_get_changelogrec(setno, orig_class);
1586                         assert(lr != NULL);
1587 
1588                         /* proceed with the logged message */
1589                         msg = copy_msg(&(lr->lr_msg), NULL);
1590 
1591                         /*
1592                          * The logged message has to have the same class but
1593                          * type and sender can be different
1594                          */
1595                         orig_type = msgtype = msg->msg_type;
1596                         sender  = msg->msg_sender;
1597 
1598                         commd_debug(MD_MMV_PROC_M,
1599                             "proc_mas: Got new message from change log: "
1600                             "(%d,0x%llx-%d) type %d\n",
1601                             MSGID_ELEMS(msg->msg_msgid), msgtype);
1602 
1603                         /* continue normal operation with this message */
1604                 }
1605         }
1606 
1607 proceed:
1608         smgen = mdmn_get_submessage_generator(msgtype);
1609         if (smgen == NULL) {
1610                 /* no submessages to create, just use the original message */
1611                 msglist[0] = msg;
1612                 nmsgs = 1;
1613         } else {
1614                 /* some bits are passed on to submessages */
1615                 inherit_flags = msg->msg_flags & MD_MSGF_INHERIT_BITS;
1616 
1617                 nmsgs = smgen(msg, msglist);
1618 
1619                 /* some settings for the submessages */
1620                 for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1621                         cmsg    = msglist[curmsg];
1622 
1623                         /* Apply the inherited flags */
1624                         cmsg->msg_flags |= inherit_flags;
1625 
1626                         /*
1627                          * Make sure the submessage ID is set correctly
1628                          * Note: first submessage has mid_smid of 1 (not 0)
1629                          */
1630                         cmsg->msg_msgid.mid_smid = curmsg + 1;
1631 
1632                         /* need the original class set in msgID (for MCT) */
1633                         cmsg->msg_msgid.mid_oclass = orig_class;
1634                 }
1635 
1636                 commd_debug(MD_MMV_PROC_M,
1637                     "smgen generated %d submsgs, origclass = %d\n",
1638                     nmsgs, orig_class);
1639         }
1640         /*
1641          * This big loop does the following.
1642          * For all messages:
1643          *      process message on the master first (a message completion
1644          *              table MCT ensures a message is not processed twice)
1645          *      in case of an error break out of message loop
1646          *      for all nodes -- unless MD_MSGF_NO_BCAST is set --
1647          *              send message to node until that succeeds
1648          *              merge result -- not yet implemented
1649          *              respect MD_MSGF_STOP_ON_ERROR
1650          */
1651         for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1652                 int     break_msg_loop = 0;
1653                 mutex_t *mx;            /* protection for class_busy */
1654                 int     master_err;
1655                 int     master_exitval = -1;
1656 
1657                 cmsg    = msglist[curmsg];
1658                 msgtype = cmsg->msg_type;
1659                 class   = mdmn_get_message_class(msgtype);
1660                 node    = NULL;
1661                 mx      = mdmn_get_master_table_mx(setno, class);
1662 
1663                 /* If we are in the abort state, we error out immediately */
1664                 if (md_commd_global_state & MD_CGS_ABORTED) {
1665                         break; /* out of the message loop */
1666                 }
1667 
1668                 commd_debug(MD_MMV_PROC_M, "class=%d, orig_class=%d\n",
1669                     class, orig_class);
1670                 /*
1671                  * If the current class is different from the original class,
1672                  * we have to lock it down.
1673                  * The original class is already marked busy.
1674                  * At this point we cannot refuse the message because the
1675                  * class is busy right now, so we wait until the class becomes
1676                  * available again. As soon as something changes for this set
1677                  * we will be cond_signal'ed (in mdmn_mark_class_unbusy)
1678                  *
1679                  * Granularity could be finer (setno/class)
1680                  */
1681                 if (class != orig_class) {
1682                         (void) mutex_lock(&mdmn_busy_mutex[setno]);
1683                         while (mdmn_mark_class_busy(setno, class) == FALSE) {
1684                                 (void) cond_wait(&mdmn_busy_cv[setno],
1685                                     &mdmn_busy_mutex[setno]);
1686                         }
1687                         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
1688                 }
1689 
1690                 master_err = do_message_locally(cmsg, result);
1691 
1692                 if ((master_err != MDMNE_ACK) ||
1693                     ((master_err == MDMNE_ACK) && (result->mmr_exitval != 0))) {
1694                         result->mmr_failing_node = set_master;
1695                         if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1696                                 /*
1697                                  * if appropriate, unbusy the class and
1698                                  * break out of the message loop
1699                                  */
1700                                 if (class != orig_class) {
1701                                         (void) mutex_lock(
1702                                             &mdmn_busy_mutex[setno]);
1703                                         mdmn_mark_class_unbusy(setno, class);
1704                                         (void) mutex_unlock(
1705                                             &mdmn_busy_mutex[setno]);
1706                                 }
1707                                 break;
1708                         }
1709                 }
1710 
1711                 if (master_err == MDMNE_ACK)
1712                         master_exitval = result->mmr_exitval;
1713 
1714                 /* No broadcast? => next message */
1715                 if (cmsg->msg_flags & MD_MSGF_NO_BCAST) {
1716                         /* if appropriate, unbusy the class */
1717                         if (class != orig_class) {
1718                                 (void) mutex_lock(&mdmn_busy_mutex[setno]);
1719                                 mdmn_mark_class_unbusy(setno, class);
1720                                 (void) mutex_unlock(&mdmn_busy_mutex[setno]);
1721                         }
1722                         continue;
1723                 }
1724 
1725 
1726                 /* fake sender, so we get notified when the results are avail */
1727                 cmsg->msg_sender = set_master;
1728                 /*
1729                  * register to the master_table. It's needed by wakeup_master to
1730                  * wakeup the sleeping thread.
1731                  * Access is protected by the class lock: mdmn_mark_class_busy()
1732                  */
1733                 mdmn_set_master_table_id(setno, class, &(cmsg->msg_msgid));
1734 
1735 
1736 
1737                 (void) rw_rdlock(&set_desc_rwlock[setno]);
1738                 /* Send the message  to all other nodes */
1739                 for (node = set_descriptor[setno]->sd_nodelist; node;
1740                     node = node->nd_next) {
1741                         md_mn_nodeid_t nid = node->nd_nodeid;
1742 
1743                         /* We are master and have already processed the msg */
1744                         if (node == set_descriptor[setno]->sd_mn_masternode) {
1745                                 continue;
1746                         }
1747 
1748                         /* If this node didn't join the disk set, ignore it */
1749                         if ((node->nd_flags & MD_MN_NODE_OWN) == 0) {
1750                                 continue;
1751                         }
1752 
1753                         /* If a DIRECTED message, skip non-recipient nodes */
1754                         if ((cmsg->msg_flags & MD_MSGF_DIRECTED) &&
1755                             nid != cmsg->msg_recipient) {
1756                                 continue;
1757                         }
1758 
1759                         (void) mutex_lock(mx);
1760                         /*
1761                          * Register the node that is addressed,
1762                          * so we can detect unsolicited messages
1763                          */
1764                         mdmn_set_master_table_addr(setno, class, nid);
1765                         slave_result = (md_mn_result_t *)NULL;
1766 
1767                         /*
1768                          * Now send it. do_send_message() will return if
1769                          *      a failure occurs or
1770                          *      the results are available
1771                          */
1772                         err = do_send_message(cmsg, node);
1773 
1774                         /*  in abort state, we error out immediately */
1775                         if (md_commd_global_state & MD_CGS_ABORTED) {
1776                                 break;
1777                         }
1778 
1779                         if (err == MDMNE_ACK) {
1780                                 slave_result =
1781                                     mdmn_get_master_table_res(setno, class);
1782                                 commd_debug(MD_MMV_PROC_M,
1783                                     "proc_mas: got result for (%d,0x%llx-%d)\n",
1784                                     MSGID_ELEMS(cmsg->msg_msgid));
1785                         } else if (err == MDMNE_IGNORE_NODE) {
1786                                 (void) mutex_unlock(mx);
1787                                 continue; /* send to next node */
1788                         }
1789                         (void) mutex_unlock(mx);
1790 
1791 
1792                         /*
1793                          * If the result is NULL, or err doesn't show success,
1794                          * something went wrong with this RPC call.
1795                          */
1796                         if ((slave_result == NULL) || (err != MDMNE_ACK)) {
1797                                 /*
1798                                  * If PANIC_WHEN_INCONSISTENT set,
1799                                  * panic if the master succeeded while
1800                                  * this node failed
1801                                  */
1802                                 if ((cmsg->msg_flags &
1803                                     MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1804                                     (master_err == MDMNE_ACK))
1805                                         panic_system(nid, cmsg->msg_type,
1806                                             master_err, master_exitval,
1807                                             slave_result);
1808 
1809                                 result->mmr_failing_node = nid;
1810                                 /* are we supposed to stop in case of error? */
1811                                 if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1812                                         result->mmr_exitval = MDMNE_RPC_FAIL;
1813                                         commd_debug(MD_MMV_SYSLOG, "proc_mas: "
1814                                             "result (%d,0x%llx-%d) is NULL\n",
1815                                             MSGID_ELEMS(cmsg->msg_msgid));
1816                                         FLUSH_DEBUGFILE();
1817                                         break_msg_loop = 1;
1818                                         break; /* out of node loop first */
1819                                 } else {
1820                                         /* send msg to the next node */
1821                                         continue;
1822                                 }
1823 
1824                         }
1825 
1826                         /*
1827                          * Message processed on remote node.
1828                          * If PANIC_WHEN_INCONSISTENT set, panic if the
1829                          * result is different on this node from the result
1830                          * on the master
1831                          */
1832                         if ((cmsg->msg_flags &
1833                             MD_MSGF_PANIC_WHEN_INCONSISTENT) &&
1834                             ((master_err != MDMNE_ACK) ||
1835                             (slave_result->mmr_exitval != master_exitval)))
1836                                 panic_system(nid, cmsg->msg_type, master_err,
1837                                     master_exitval, slave_result);
1838 
1839                         /*
1840                          * At this point we know we have a message that was
1841                          * processed on the remote node.
1842                          * We now check if the exitval is non zero.
1843                          * In that case we discard the previous result and
1844                          * rather use the current.
1845                          * This means: If a message fails on no node,
1846                          * the result from the master will be returned.
1847                          * There's currently no such thing as merge of results
1848                          * If additionally STOP_ON_ERROR is set, we bail out
1849                          */
1850                         if (slave_result->mmr_exitval != 0) {
1851                                 /* throw away the previously allocated result */
1852                                 free_result(result);
1853 
1854                                 /* copy_result() allocates new memory */
1855                                 result = copy_result(slave_result);
1856                                 free_result(slave_result);
1857 
1858                                 dump_result(MD_MMV_PROC_M, "proc_mas", result);
1859 
1860                                 result->mmr_failing_node = nid;
1861                                 if (cmsg->msg_flags & MD_MSGF_STOP_ON_ERROR) {
1862                                         break_msg_loop = 1;
1863                                         break; /* out of node loop */
1864                                 }
1865                                 continue; /* try next node */
1866 
1867                         } else {
1868                                 /*
1869                                  * MNIssue: may want to merge the results
1870                                  * from all slaves.  Currently only report
1871                                  * the results from the master.
1872                                  */
1873                                 free_result(slave_result);
1874                         }
1875 
1876                 } /* End of loop over the nodes */
1877                 (void) rw_unlock(&set_desc_rwlock[setno]);
1878 
1879 
1880                 /* release the current class again */
1881                 if (class != orig_class) {
1882                         (void) mutex_lock(&mdmn_busy_mutex[setno]);
1883                         mdmn_mark_class_unbusy(setno, class);
1884                         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
1885                 }
1886 
1887                 /* are we supposed to quit entirely ? */
1888                 if (break_msg_loop ||
1889                     (md_commd_global_state & MD_CGS_ABORTED)) {
1890                         break; /* out of msg loop */
1891                 }
1892 
1893         } /* End of loop over the messages */
1894         /*
1895          * If we are here, there's two possibilities:
1896          *      - we processed all messages on all nodes without an error.
1897          *          In this case we return the result from the master.
1898          *          (to be implemented: return the merged result)
1899          *      - we encountered an error in which case result has been
1900          *          set accordingly already.
1901          */
1902 
1903         if (md_commd_global_state & MD_CGS_ABORTED) {
1904                 result->mmr_comm_state = MDMNE_ABORT;
1905         }
1906 
1907         /*
1908          * This message has been processed completely.
1909          * Remove it from the changelog.
1910          * Do this for replay messages too.
1911          * Note that the message is unlogged before waking up the
1912          * initiator.  This is done for two reasons.
1913          * 1. Remove a race condition that occurs when back to back
1914          *   messages are sent for the same class, the registeration is
1915          *   is lost.
1916          * 2. If the initiator died but the action was completed on all the
1917          *   the nodes, we want that to be marked "done" quickly.
1918          */
1919 
1920         if ((msg->msg_flags & MD_MSGF_NO_LOG) == 0) {
1921                 commd_debug(MD_MMV_PROC_M,
1922                     "proc_mas: calling unlog_msg for (%d,0x%llx-%d) type %d\n",
1923                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1924                 (void) mdmn_unlog_msg(msg);
1925                 commd_debug(MD_MMV_PROC_M,
1926                     "proc_mas: done unlog_msg for (%d,0x%llx-%d) type %d\n",
1927                     MSGID_ELEMS(msg->msg_msgid), msgtype);
1928         }
1929 
1930         /*
1931          * In case of submessages, we increased the submessage ID in the
1932          * result structure. We restore the message ID to the value that
1933          * the initiator is waiting for.
1934          */
1935         result->mmr_msgid.mid_smid   = 0;
1936         result->mmr_msgtype          = orig_type;
1937         result->mmr_sender           = set_master;
1938 
1939         /* if we have an inited client, send result */
1940         ret = (int *)NULL;
1941 
1942         (void) rw_rdlock(&client_rwlock[setno]);
1943         if (check_client(setno, sender)) {
1944                 commd_debug(MD_MMV_SYSLOG,
1945                     "proc_mas: unable to create client for initiator\n");
1946         } else {
1947                 ret = mdmn_wakeup_initiator_2(result, client[setno][sender],
1948                     sender);
1949         }
1950         (void) rw_unlock(&client_rwlock[setno]);
1951 
1952         if (ret == (int *)NULL) {
1953                 commd_debug(MD_MMV_PROC_M,
1954                     "proc_mas: couldn't wakeup initiator\n");
1955         } else {
1956                 if (*ret != MDMNE_ACK) {
1957                         commd_debug(MD_MMV_PROC_M,
1958                             "proc_mas: wakeup_initiator returned %d\n",
1959                             *ret);
1960                 }
1961                 free(ret);
1962         }
1963 
1964         (void) rw_unlock(&set_desc_rwlock[setno]);
1965         /* Free all submessages, if there were any */
1966         if (nmsgs > 1) {
1967                 for (curmsg = 0; curmsg < nmsgs; curmsg++) {
1968                         free_msg(msglist[curmsg]);
1969                 }
1970         }
1971         /* Free the result */
1972         free_result(result);
1973 
1974         (void) mutex_lock(&mdmn_busy_mutex[setno]);
1975         mdmn_mark_class_unbusy(setno, orig_class);
1976         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
1977 
1978 
1979         /*
1980          * We use this ioctl just to get the time in the same format as used in
1981          * the messageID. If it fails, all we get is a bad runtime output.
1982          */
1983         (void) metaioctl(MD_IOCGUNIQMSGID, &dummyid, &mde, NULL);
1984         secdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) >> 32;
1985         usecdiff = (dummyid.mid_time - msg->msg_msgid.mid_time) & 0xfffff;
1986 
1987         /* catching possible overflow */
1988         if (usecdiff >= 1000000) {
1989                 usecdiff -= 1000000;
1990                 secdiff++;
1991         }
1992 
1993 
1994         commd_debug(MD_MMV_PROC_M, "proc_mas: done (%d, 0x%llx-%d) type=%02d "
1995             "%5d.%06d secs runtime\n",
1996             MSGID_ELEMS(msg->msg_msgid), orig_type, secdiff, usecdiff);
1997 
1998         /* Free the original message */
1999         free_msg(msg);
2000 }
2001 
2002 void
2003 mdmn_slave_process_msg(md_mn_msg_t *msg)
2004 {
2005         int                     *ret = NULL;
2006         int                     completed;
2007         int                     retries;
2008         int                     successfully_returned;
2009         set_t                   setno;
2010         md_mn_result_t          *result;
2011         md_mn_nodeid_t          sender;
2012         md_mn_nodeid_t          whoami;
2013         md_mn_msgtype_t         msgtype;
2014         md_mn_msgclass_t        class;
2015 
2016         void (*handler)(md_mn_msg_t *msg, uint_t flags, md_mn_result_t *res);
2017 
2018         setno   = msg->msg_setno;
2019         sender  = msg->msg_sender; /* this is always the master of the set */
2020         msgtype = msg->msg_type;
2021 
2022         (void) rw_rdlock(&set_desc_rwlock[setno]);
2023         whoami          = set_descriptor[setno]->sd_mn_mynode->nd_nodeid;
2024         (void) rw_unlock(&set_desc_rwlock[setno]);
2025 
2026         result = Zalloc(sizeof (md_mn_result_t));
2027         result->mmr_flags    = msg->msg_flags;
2028         result->mmr_setno    = setno;
2029         result->mmr_msgtype  = msgtype;
2030         result->mmr_sender   = whoami;
2031         result->mmr_comm_state       = MDMNE_ACK; /* Ok state */
2032         MSGID_COPY(&(msg->msg_msgid), &(result->mmr_msgid));
2033         class = mdmn_get_message_class(msgtype);
2034 
2035         commd_debug(MD_MMV_PROC_S,
2036             "proc_sla: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2037             MSGID_ELEMS(msg->msg_msgid), setno, class, msgtype);
2038 
2039         handler = mdmn_get_handler(msgtype);
2040 
2041         if (handler == NULL) {
2042                 result->mmr_exitval = 0;
2043                 /* let the sender decide if this is an error or not */
2044                 result->mmr_comm_state = MDMNE_NO_HANDLER;
2045                 commd_debug(MD_MMV_PROC_S,
2046                     "proc_sla: No handler for (%d, 0x%llx-%d)\n",
2047                     MSGID_ELEMS(msg->msg_msgid));
2048         } else {
2049 
2050                 /* Did we already process this message ? */
2051                 (void) mutex_lock(&mct_mutex[setno][class]);
2052                 completed = mdmn_check_completion(msg, result);
2053 
2054                 if (completed == MDMN_MCT_NOT_DONE) {
2055                         /* message not yet processed locally */
2056                         commd_debug(MD_MMV_PROC_S,
2057                             "proc_sla: calling handler for (%d, 0x%llx-%d)\n",
2058                             MSGID_ELEMS(msg->msg_msgid));
2059 
2060                         /*
2061                          * Mark the message as being currently processed,
2062                          * so we won't start a second handler for it
2063                          */
2064                         (void) mdmn_mark_completion(msg, NULL,
2065                             MDMN_MCT_IN_PROGRESS);
2066 
2067                         (void) mutex_unlock(&mct_mutex[setno][class]);
2068                         (*handler)(msg, MD_MSGF_ON_SLAVE, result);
2069 
2070                         commd_debug(MD_MMV_PROC_S,
2071                             "proc_sla: finished handler for (%d, 0x%llx-%d)\n",
2072                             MSGID_ELEMS(msg->msg_msgid));
2073 
2074                         (void) mutex_lock(&mct_mutex[setno][class]);
2075                         /* Mark the message as fully done, store the result */
2076                         (void) mdmn_mark_completion(msg, result, MDMN_MCT_DONE);
2077 
2078                 } else if (completed == MDMN_MCT_DONE) {
2079                         /* message processed previously, got result from MCT */
2080                         commd_debug(MD_MMV_PROC_S,
2081                             "proc_sla: result for (%d, 0x%llx-%d) from MCT\n",
2082                             MSGID_ELEMS(msg->msg_msgid));
2083                 } else if (completed == MDMN_MCT_IN_PROGRESS) {
2084                         /*
2085                          * If the message is curruntly being processed,
2086                          * we can return here, without sending a result back.
2087                          * This will be done by the initial message handling
2088                          * thread
2089                          */
2090                         (void) mutex_unlock(&mct_mutex[setno][class]);
2091                         commd_debug(MD_MMV_PROC_M, "proc_sla: "
2092                             "(%d, 0x%llx-%d) is currently being processed\n",
2093                             MSGID_ELEMS(msg->msg_msgid), msgtype);
2094 
2095                         free_msg(msg);
2096                         free_result(result);
2097                         return;
2098                 } else {
2099                         /* MCT error occurred (should never happen) */
2100                         result->mmr_comm_state = MDMNE_LOG_FAIL;
2101                         commd_debug(MD_MMV_PROC_S,
2102                             "proc_sla: MCT error for (%d, 0x%llx-%d)\n",
2103                             MSGID_ELEMS(msg->msg_msgid));
2104                 }
2105                 (void) mutex_unlock(&mct_mutex[setno][class]);
2106         }
2107 
2108         /*
2109          * At this point we have a result (even in an error case)
2110          * that we return to the master.
2111          */
2112         (void) rw_rdlock(&set_desc_rwlock[setno]);
2113         retries = 2; /* we will try two times to send the results */
2114         successfully_returned = 0;
2115 
2116         while (!successfully_returned && (retries != 0)) {
2117                 ret = (int *)NULL;
2118                 (void) rw_rdlock(&client_rwlock[setno]);
2119                 if (check_client(setno, sender)) {
2120                         /*
2121                          * If we cannot setup the rpc connection to the master,
2122                          * we can't do anything besides logging this fact.
2123                          */
2124                         commd_debug(MD_MMV_SYSLOG,
2125                             "proc_mas: unable to create client for master\n");
2126                         (void) rw_unlock(&client_rwlock[setno]);
2127                         break;
2128                 } else {
2129                         ret = mdmn_wakeup_master_2(result,
2130                             client[setno][sender], sender);
2131                         /*
2132                          * if mdmn_wakeup_master_2 returns NULL, it can be that
2133                          * the master (or the commd on the master) had died.
2134                          * In that case, we destroy the client to the master
2135                          * and retry.
2136                          * If mdmn_wakeup_master_2 doesn't return MDMNE_ACK,
2137                          * the commd on the master is alive but
2138                          * something else is wrong,
2139                          * in that case a retry doesn't make sense => break out
2140                          */
2141                         if (ret == (int *)NULL) {
2142                                 commd_debug(MD_MMV_PROC_S,
2143                                     "proc_sla: wakeup_master returned NULL\n");
2144                                 /* release reader lock, grab writer lock */
2145                                 (void) rw_unlock(&client_rwlock[setno]);
2146                                 (void) rw_wrlock(&client_rwlock[setno]);
2147                                 mdmn_clnt_destroy(client[setno][sender]);
2148                                 if (client[setno][sender] != (CLIENT *)NULL) {
2149                                         client[setno][sender] = (CLIENT *)NULL;
2150                                 }
2151                                 (void) rw_unlock(&client_rwlock[setno]);
2152                                 retries--;
2153                                 commd_debug(MD_MMV_PROC_S,
2154                                     "retries = %d\n", retries);
2155                                 continue;
2156                         }
2157                         if (*ret != MDMNE_ACK) {
2158                                 commd_debug(MD_MMV_PROC_S, "proc_sla: "
2159                                     "wakeup_master returned %d\n", *ret);
2160                                 (void) rw_unlock(&client_rwlock[setno]);
2161                                 break;
2162                         } else { /* Good case */
2163                                 successfully_returned = 1;
2164                                 (void) rw_unlock(&client_rwlock[setno]);
2165                         }
2166                 }
2167         }
2168 
2169         (void) rw_unlock(&set_desc_rwlock[setno]);
2170         commd_debug(MD_MMV_PROC_S, "proc_sla: done (%d, 0x%llx-%d)\n",
2171             MSGID_ELEMS(msg->msg_msgid));
2172 
2173         if (ret != (int *)NULL)
2174                 free(ret);
2175         free_msg(msg);
2176         free_result(result);
2177 }
2178 
2179 
2180 /*
2181  * mdmn_send_svc_2:
2182  * ---------------
2183  * Check that the issuing node is a legitimate one (i.e. is licensed to send
2184  * messages to us), that the RPC request can be staged.
2185  *
2186  * Returns:
2187  *      0       => no RPC request is in-flight, no deferred svc_sendreply()
2188  *      1       => queued RPC request in-flight. Completion will be made (later)
2189  *                 by a wakeup_initiator_2() [hopefully]
2190  */
2191 int
2192 mdmn_send_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2193 {
2194         int                     err;
2195         set_t                   setno;
2196         SVCXPRT                 *transp = rqstp->rq_xprt;
2197         md_mn_msg_t             *msg;
2198         md_mn_result_t          *resultp;
2199         md_mn_msgclass_t        class;
2200         md_mn_msg_and_transp_t  *matp;
2201 
2202         msg = copy_msg(omsg, NULL);
2203         xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2204 
2205         setno = msg->msg_setno;
2206         class = mdmn_get_message_class(msg->msg_type);
2207 
2208         /* If we are in the abort state, we error out immediately */
2209         if (md_commd_global_state & MD_CGS_ABORTED) {
2210                 resultp = Zalloc(sizeof (md_mn_result_t));
2211                 resultp->mmr_comm_state = MDMNE_ABORT;
2212                 mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2213                 free_result(resultp);
2214                 svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2215                 return (0);
2216         }
2217 
2218         /* check if the global initialization is done */
2219         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2220                 global_init();
2221         }
2222 
2223         commd_debug(MD_MMV_SEND,
2224             "send: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2225             MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2226 
2227         /* Check for verbosity related message */
2228         if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2229                 md_mn_verbose_t *d;
2230 
2231                 d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2232                 md_commd_global_verb = d->mmv_what;
2233                 /* everytime the bitmask is set, we reset the timer */
2234                 __savetime = gethrtime();
2235                 /*
2236                  * If local-only-flag is set, we are done here,
2237                  * otherwise we pass that message on to the master.
2238                  */
2239                 if (msg->msg_flags & MD_MSGF_LOCAL_ONLY) {
2240                         resultp = Zalloc(sizeof (md_mn_result_t));
2241                         resultp->mmr_comm_state = MDMNE_ACK;
2242                         mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2243                             (char *)resultp);
2244                         free_result(resultp);
2245                         svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2246                         return (0);
2247                 }
2248         }
2249 
2250         /*
2251          * Are we entering the abort state?
2252          * Here we don't even need to check for MD_MSGF_LOCAL_ONLY, because
2253          * this message cannot be distributed anyway.
2254          * So, it's safe to return immediately.
2255          */
2256         if (msg->msg_type == MD_MN_MSG_ABORT) {
2257                 md_commd_global_state |= MD_CGS_ABORTED;
2258                 resultp = Zalloc(sizeof (md_mn_result_t));
2259                 resultp->mmr_comm_state = MDMNE_ACK;
2260                 mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2261                 free_result(resultp);
2262                 svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2263                 return (0);
2264         }
2265 
2266 
2267         /*
2268          * Is this message type blocked?
2269          * If so we return MDMNE_CLASS_LOCKED, immediately
2270          */
2271         if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2272                 resultp = Zalloc(sizeof (md_mn_result_t));
2273                 resultp->mmr_comm_state = MDMNE_CLASS_LOCKED;
2274                 mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2275                 free_result(resultp);
2276                 svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2277                 commd_debug(MD_MMV_SEND,
2278                     "send: type locked (%d, 0x%llx-%d), set=%d, class=%d, "
2279                     "type=%d\n", MSGID_ELEMS(msg->msg_msgid), setno, class,
2280                     msg->msg_type);
2281                 return (0);
2282         }
2283 
2284 
2285         if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2286                 /* Can only use the appropriate mutexes if they are inited */
2287                 if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2288                         (void) rw_wrlock(&set_desc_rwlock[setno]);
2289                         (void) rw_wrlock(&client_rwlock[setno]);
2290                         err = mdmn_init_set(setno, MDMN_SET_READY);
2291                         (void) rw_unlock(&client_rwlock[setno]);
2292                         (void) rw_unlock(&set_desc_rwlock[setno]);
2293                 } else {
2294                         err = mdmn_init_set(setno, MDMN_SET_READY);
2295                 }
2296 
2297                 if (err) {
2298                         /* couldn't initialize connections, cannot proceed */
2299                         resultp = Zalloc(sizeof (md_mn_result_t));
2300                         resultp->mmr_comm_state = err;
2301                         mdmn_svc_sendreply(transp, xdr_md_mn_result_t,
2302                             (char *)resultp);
2303                         svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2304                         free_result(resultp);
2305                         commd_debug(MD_MMV_SEND,
2306                             "send: init err = %d\n", err);
2307                         return (0);
2308                 }
2309         }
2310 
2311         (void) mutex_lock(&mdmn_busy_mutex[setno]);
2312         if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2313             ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2314                 (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2315                 resultp = Zalloc(sizeof (md_mn_result_t));
2316                 resultp->mmr_comm_state = MDMNE_SUSPENDED;
2317                 mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)resultp);
2318                 svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2319                 free_result(resultp);
2320                 commd_debug(MD_MMV_SEND,
2321                     "send: class suspended (%d, 0x%llx-%d), set=%d, "
2322                     "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2323                     setno, class, msg->msg_type);
2324                 return (0);
2325         }
2326         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2327 
2328         /* is this rpc request coming from the local node? */
2329         if (check_license(rqstp, 0) == FALSE) {
2330                 svc_freeargs(transp, xdr_md_mn_msg_t, (caddr_t)msg);
2331                 commd_debug(MD_MMV_SEND,
2332                     "send: check licence fail(%d, 0x%llx-%d), set=%d, "
2333                     "class=%d, type=%d\n", MSGID_ELEMS(msg->msg_msgid),
2334                     setno, class, msg->msg_type);
2335                 return (0);
2336         }
2337 
2338 
2339         /*
2340          * We allocate a structure that can take two pointers in order to pass
2341          * both the message and the transp into thread_create.
2342          * The free for this alloc is done in mdmn_send_to_work()
2343          */
2344         matp = Malloc(sizeof (md_mn_msg_and_transp_t));
2345         matp->mat_msg = msg;
2346         matp->mat_transp = transp;
2347 
2348         /*
2349          * create a thread here that calls work on the master.
2350          * If we are already on the master, this would block if running
2351          * in the same context. (our service is single threaded)(
2352          * Make it a detached thread because it will not communicate with
2353          * anybody thru thr_* mechanisms
2354          */
2355         (void) thr_create(NULL, 0, mdmn_send_to_work, (void *) matp,
2356             THR_DETACHED, NULL);
2357 
2358         commd_debug(MD_MMV_SEND, "send: done (%d, 0x%llx-%d)\n",
2359             MSGID_ELEMS(msg->msg_msgid));
2360         /*
2361          * We return here without sending results. This will be done by
2362          * mdmn_wakeup_initiator_svc_2() as soon as the results are available.
2363          * Until then the calling send_message will be blocked, while we
2364          * are able to take calls.
2365          */
2366 
2367         return (1);
2368 }
2369 
2370 /* ARGSUSED */
2371 int *
2372 mdmn_work_svc_2(md_mn_msg_t *omsg, struct svc_req *rqstp)
2373 {
2374         int             err;
2375         set_t           setno;
2376         thread_t        tid;
2377         int             *retval;
2378         md_mn_msg_t     *msg;
2379         md_mn_msgclass_t class;
2380 
2381         retval = Malloc(sizeof (int));
2382 
2383         /* If we are in the abort state, we error out immediately */
2384         if (md_commd_global_state & MD_CGS_ABORTED) {
2385         xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2386                 *retval = MDMNE_ABORT;
2387                 return (retval);
2388         }
2389 
2390         msg = copy_msg(omsg, NULL);
2391         xdr_free(xdr_md_mn_msg_t, (caddr_t)omsg);
2392 
2393         /*
2394          * Is this message type blocked?
2395          * If so we return MDMNE_CLASS_LOCKED, immediately.
2396          * This check is performed on master and slave.
2397          */
2398         if (msgtype_lock_state[msg->msg_type] == MMTL_LOCK) {
2399                 *retval = MDMNE_CLASS_LOCKED;
2400                 return (retval);
2401         }
2402 
2403         /* check if the global initialization is done */
2404         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2405                 global_init();
2406         }
2407 
2408         class = mdmn_get_message_class(msg->msg_type);
2409         setno = msg->msg_setno;
2410 
2411         if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2412                 /* Can only use the appropriate mutexes if they are inited */
2413                 if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2414                         (void) rw_wrlock(&set_desc_rwlock[setno]);
2415                         (void) rw_wrlock(&client_rwlock[setno]);
2416                         err = mdmn_init_set(setno, MDMN_SET_READY);
2417                         (void) rw_unlock(&client_rwlock[setno]);
2418                         (void) rw_unlock(&set_desc_rwlock[setno]);
2419                 } else {
2420                         err = mdmn_init_set(setno, MDMN_SET_READY);
2421                 }
2422 
2423                 if (err) {
2424                         *retval = MDMNE_CANNOT_CONNECT;
2425                         free_msg(msg);
2426                         return (retval);
2427                 }
2428         }
2429 
2430         /* is this rpc request coming from a licensed node? */
2431         if (check_license(rqstp, msg->msg_sender) == FALSE) {
2432                 free_msg(msg);
2433                 *retval = MDMNE_RPC_FAIL;
2434                 return (retval);
2435         }
2436 
2437         commd_debug(MD_MMV_WORK,
2438             "work: received (%d, 0x%llx-%d), set=%d, class=%d, type=%d, "
2439             "flags=0x%x\n",
2440             MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type,
2441             msg->msg_flags);
2442 
2443         /* Check for various CLASS0 message types */
2444         if (msg->msg_type == MD_MN_MSG_VERBOSITY) {
2445                 md_mn_verbose_t *d;
2446 
2447                 d = (md_mn_verbose_t *)((void *)(msg->msg_event_data));
2448                 /* for now we ignore set / class in md_mn_verbose_t */
2449                 md_commd_global_verb = d->mmv_what;
2450                 /* everytime the bitmask is set, we reset the timer */
2451                 __savetime = gethrtime();
2452         }
2453 
2454         (void) mutex_lock(&mdmn_busy_mutex[setno]);
2455 
2456         /* check if class is locked via a call to mdmn_comm_lock_svc_2 */
2457         if (mdmn_is_class_locked(setno, class) == TRUE) {
2458                 (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2459                 *retval = MDMNE_CLASS_LOCKED;
2460                 free_msg(msg);
2461                 return (retval);
2462         }
2463         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2464 
2465         /* Check if the class is busy right now. Do it only on the master */
2466         (void) rw_rdlock(&set_desc_rwlock[setno]);
2467         if (set_descriptor[setno]->sd_mn_am_i_master) {
2468                 (void) rw_unlock(&set_desc_rwlock[setno]);
2469                 /*
2470                  * If the class is currently suspended, don't accept new
2471                  * messages, unless they are flagged with an override bit.
2472                  */
2473                 (void) mutex_lock(&mdmn_busy_mutex[setno]);
2474                 if ((mdmn_is_class_suspended(setno, class) == TRUE) &&
2475                     ((msg->msg_flags & MD_MSGF_OVERRIDE_SUSPEND) == 0)) {
2476                         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2477                         *retval = MDMNE_SUSPENDED;
2478                         commd_debug(MD_MMV_SEND,
2479                             "send: set %d is suspended\n", setno);
2480                         free_msg(msg);
2481                         return (retval);
2482                 }
2483                 if (mdmn_mark_class_busy(setno, class) == FALSE) {
2484                         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2485                         *retval = MDMNE_CLASS_BUSY;
2486                         free_msg(msg);
2487                         return (retval);
2488                 }
2489                 (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2490                 /*
2491                  * Because the real processing of the message takes time we
2492                  * create a thread for it. So the master thread can continue
2493                  * to run and accept further messages.
2494                  */
2495                 *retval = thr_create(NULL, 0,
2496                     (void *(*)(void *))mdmn_master_process_msg, (void *)msg,
2497                     THR_DETACHED|THR_SUSPENDED, &tid);
2498         } else {
2499                 (void) rw_unlock(&set_desc_rwlock[setno]);
2500                 *retval = thr_create(NULL, 0,
2501                     (void *(*)(void *)) mdmn_slave_process_msg, (void *)msg,
2502                     THR_DETACHED|THR_SUSPENDED, &tid);
2503         }
2504 
2505         if (*retval != 0) {
2506                 *retval = MDMNE_THR_CREATE_FAIL;
2507                 free_msg(msg);
2508                 return (retval);
2509         }
2510 
2511         /* Now run the new thread */
2512         (void) thr_continue(tid);
2513 
2514         commd_debug(MD_MMV_WORK,
2515             "work: done (%d, 0x%llx-%d), set=%d, class=%d, type=%d\n",
2516             MSGID_ELEMS(msg->msg_msgid), setno, class, msg->msg_type);
2517 
2518         *retval = MDMNE_ACK; /* this means success */
2519         return (retval);
2520 }
2521 
2522 /* ARGSUSED */
2523 int *
2524 mdmn_wakeup_initiator_svc_2(md_mn_result_t *res, struct svc_req *rqstp)
2525 {
2526 
2527         int             *retval;
2528         int             err;
2529         set_t           setno;
2530         mutex_t         *mx;   /* protection of initiator_table */
2531         SVCXPRT         *transp = NULL;
2532         md_mn_msgid_t   initiator_table_id;
2533         md_mn_msgclass_t class;
2534 
2535         retval = Malloc(sizeof (int));
2536 
2537         /* check if the global initialization is done */
2538         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2539                 global_init();
2540         }
2541 
2542         setno   = res->mmr_setno;
2543 
2544         if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2545                 /* set not ready means we just crashed are restarted now */
2546                 /* Can only use the appropriate mutexes if they are inited */
2547                 if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2548                         (void) rw_wrlock(&set_desc_rwlock[setno]);
2549                         (void) rw_wrlock(&client_rwlock[setno]);
2550                         err = mdmn_init_set(setno, MDMN_SET_READY);
2551                         (void) rw_unlock(&client_rwlock[setno]);
2552                         (void) rw_unlock(&set_desc_rwlock[setno]);
2553                 } else {
2554                         err = mdmn_init_set(setno, MDMN_SET_READY);
2555                 }
2556 
2557                 if (err) {
2558                         *retval = MDMNE_CANNOT_CONNECT;
2559                         xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2560                         return (retval);
2561                 }
2562         }
2563 
2564         /* is this rpc request coming from a licensed node? */
2565         if (check_license(rqstp, res->mmr_sender) == FALSE) {
2566                 xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2567                 *retval = MDMNE_RPC_FAIL;
2568                 return (retval);
2569         }
2570 
2571 
2572         class   = mdmn_get_message_class(res->mmr_msgtype);
2573         mx      = mdmn_get_initiator_table_mx(setno, class);
2574 
2575         commd_debug(MD_MMV_WAKE_I,
2576             "wake_ini: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d\n",
2577             MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype);
2578 
2579         (void) mutex_lock(mx);
2580 
2581         /*
2582          * Search the initiator wakeup table.
2583          * If we find an entry here (which should always be true)
2584          * we are on the initiating node and we wakeup the original
2585          * local rpc call.
2586          */
2587         mdmn_get_initiator_table_id(setno, class, &initiator_table_id);
2588 
2589         if (MSGID_CMP(&(initiator_table_id), &(res->mmr_msgid))) {
2590                 transp = mdmn_get_initiator_table_transp(setno, class);
2591                 mdmn_svc_sendreply(transp, xdr_md_mn_result_t, (char *)res);
2592                 svc_done(transp);
2593                 mdmn_unregister_initiator_table(setno, class);
2594                 *retval = MDMNE_ACK;
2595 
2596                 commd_debug(MD_MMV_WAKE_I,
2597                     "wake_ini: replied (%d, 0x%llx-%d)\n",
2598                     MSGID_ELEMS(res->mmr_msgid));
2599         } else {
2600                 commd_debug(MD_MMV_WAKE_I,
2601                     "wakeup initiator: unsolicited message (%d, 0x%llx-%d)\n",
2602                     MSGID_ELEMS(res->mmr_msgid));
2603                 *retval = MDMNE_NO_WAKEUP_ENTRY;
2604         }
2605         (void) mutex_unlock(mx);
2606         /* less work for check_timeouts */
2607         (void) mutex_lock(&check_timeout_mutex);
2608         if (messages_on_their_way == 0) {
2609                 commd_debug(MD_MMV_WAKE_I,
2610                     "Oops, messages_on_their_way < 0 (%d, 0x%llx-%d)\n",
2611                     MSGID_ELEMS(res->mmr_msgid));
2612         } else {
2613                 messages_on_their_way--;
2614         }
2615         (void) mutex_unlock(&check_timeout_mutex);
2616         xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2617 
2618         return (retval);
2619 }
2620 
2621 
2622 /*
2623  * res must be free'd by the thread we wake up
2624  */
2625 /* ARGSUSED */
2626 int *
2627 mdmn_wakeup_master_svc_2(md_mn_result_t *ores, struct svc_req *rqstp)
2628 {
2629 
2630         int             *retval;
2631         int             err;
2632         set_t           setno;
2633         cond_t          *cv;
2634         mutex_t         *mx;
2635         md_mn_msgid_t   master_table_id;
2636         md_mn_nodeid_t  sender;
2637         md_mn_result_t  *res;
2638         md_mn_msgclass_t class;
2639 
2640         retval = Malloc(sizeof (int));
2641 
2642         /* check if the global initialization is done */
2643         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2644                 global_init();
2645         }
2646 
2647         /* Need to copy the results here, as they are static for RPC */
2648         res = copy_result(ores);
2649         xdr_free(xdr_md_mn_result_t, (caddr_t)ores);
2650 
2651         class = mdmn_get_message_class(res->mmr_msgtype);
2652         setno = res->mmr_setno;
2653 
2654         if (md_mn_set_inited[setno] != MDMN_SET_READY) {
2655                 /* set not ready means we just crashed are restarted now */
2656                 /* Can only use the appropriate mutexes if they are inited */
2657                 if (md_mn_set_inited[setno] & MDMN_SET_MUTEXES) {
2658                         (void) rw_wrlock(&set_desc_rwlock[setno]);
2659                         (void) rw_wrlock(&client_rwlock[setno]);
2660                         err = mdmn_init_set(setno, MDMN_SET_READY);
2661                         (void) rw_unlock(&client_rwlock[setno]);
2662                         (void) rw_unlock(&set_desc_rwlock[setno]);
2663                 } else {
2664                         err = mdmn_init_set(setno, MDMN_SET_READY);
2665                 }
2666 
2667                 if (err) {
2668                         *retval = MDMNE_CANNOT_CONNECT;
2669                         xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2670                         return (retval);
2671                 }
2672         }
2673 
2674         /* is this rpc request coming from a licensed node? */
2675         if (check_license(rqstp, res->mmr_sender) == FALSE) {
2676                 *retval = MDMNE_RPC_FAIL;
2677                 xdr_free(xdr_md_mn_result_t, (caddr_t)res);
2678                 return (retval);
2679         }
2680 
2681 
2682         commd_debug(MD_MMV_WAKE_M,
2683             "wake_mas: received (%d, 0x%llx-%d) set=%d, class=%d, type=%d "
2684             "from %d\n",
2685             MSGID_ELEMS(res->mmr_msgid), setno, class, res->mmr_msgtype,
2686             res->mmr_sender);
2687         /*
2688          * The mutex and cv are needed for waking up the thread
2689          * sleeping in mdmn_master_process_msg()
2690          */
2691         mx = mdmn_get_master_table_mx(setno, class);
2692         cv = mdmn_get_master_table_cv(setno, class);
2693 
2694         /*
2695          * lookup the master wakeup table
2696          * If we find our message, we are on the master and
2697          * called by a slave that finished processing a message.
2698          * We store the results in the appropriate slot and
2699          * wakeup the thread (mdmn_master_process_msg()) waiting for them.
2700          */
2701         (void) mutex_lock(mx);
2702         mdmn_get_master_table_id(setno, class, &master_table_id);
2703         sender = mdmn_get_master_table_addr(setno, class);
2704 
2705         if (MSGID_CMP(&(master_table_id), &(res->mmr_msgid))) {
2706                 if (sender == res->mmr_sender) {
2707                         mdmn_set_master_table_res(setno, class, res);
2708                         (void) cond_signal(cv);
2709                         *retval = MDMNE_ACK;
2710                 } else {
2711                         /* id is correct but wrong sender (I smell a timeout) */
2712                         commd_debug(MD_MMV_WAKE_M,
2713                             "wakeup master got unsolicited message: "
2714                             "(%d, 0x%llx-%d) from %d\n",
2715                             MSGID_ELEMS(res->mmr_msgid), res->mmr_sender);
2716                         free_result(res);
2717                         *retval = MDMNE_TIMEOUT;
2718                 }
2719         } else {
2720                 /* id is wrong, smells like a very late timeout */
2721                 commd_debug(MD_MMV_WAKE_M,
2722                     "wakeup master got unsolicited message: "
2723                     "(%d, 0x%llx-%d) from %d, expected (%d, 0x%llx-%d)\n",
2724                     MSGID_ELEMS(res->mmr_msgid), res->mmr_sender,
2725                     MSGID_ELEMS(master_table_id));
2726                 free_result(res);
2727                 *retval = MDMNE_NO_WAKEUP_ENTRY;
2728         }
2729 
2730         (void) mutex_unlock(mx);
2731 
2732         return (retval);
2733 }
2734 
2735 /*
2736  * Lock a set/class combination.
2737  * This is mainly done for debug purpose.
2738  * This set/class combination immediately is blocked,
2739  * even in the middle of sending messages to multiple slaves.
2740  * This remains until the user issues a mdmn_comm_unlock_svc_2 for the same
2741  * set/class combination.
2742  *
2743  * Special messages of class MD_MSG_CLASS0 can never be locked.
2744  *      e.g. MD_MN_MSG_VERBOSITY, MD_MN_MSG_ABORT
2745  *
2746  * That means, if MD_MSG_CLASS0 is specified, we lock all classes from
2747  * >= MD_MSG_CLASS1 to < MD_MN_NCLASSES
2748  *
2749  * set must be between 1 and MD_MAXSETS
2750  * class can be:
2751  *      MD_MSG_CLASS0 which means all other classes in this case
2752  *      or one specific class (< MD_MN_NCLASSES)
2753  *
2754  * Returns:
2755  *      MDMNE_ACK on sucess (locking a locked class is Ok)
2756  *      MDMNE_EINVAL if a parameter is out of range
2757  */
2758 
2759 /* ARGSUSED */
2760 int *
2761 mdmn_comm_lock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2762 {
2763         int                     *retval;
2764         set_t                   setno = msc->msc_set;
2765         md_mn_msgclass_t        class = msc->msc_class;
2766 
2767         retval = Malloc(sizeof (int));
2768 
2769         /* check if the global initialization is done */
2770         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2771                 global_init();
2772         }
2773 
2774         /* is this rpc request coming from the local node ? */
2775         if (check_license(rqstp, 0) == FALSE) {
2776                 xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2777                 *retval = MDMNE_RPC_FAIL;
2778                 return (retval);
2779         }
2780 
2781         /* Perform some range checking */
2782         if ((setno == 0) || (setno >= MD_MAXSETS) ||
2783             (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2784                 *retval = MDMNE_EINVAL;
2785                 return (retval);
2786         }
2787 
2788         commd_debug(MD_MMV_MISC, "lock: set=%d, class=%d\n", setno, class);
2789         (void) mutex_lock(&mdmn_busy_mutex[setno]);
2790         if (class != MD_MSG_CLASS0) {
2791                 mdmn_mark_class_locked(setno, class);
2792         } else {
2793                 /* MD_MSG_CLASS0 is used as a wild card for all classes */
2794                 for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2795                         mdmn_mark_class_locked(setno, class);
2796                 }
2797         }
2798         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2799 
2800         *retval = MDMNE_ACK;
2801         return (retval);
2802 }
2803 
2804 /*
2805  * Unlock a set/class combination.
2806  * set must be between 1 and MD_MAXSETS
2807  * class can be:
2808  *      MD_MSG_CLASS0 which means all other classes in this case (like above)
2809  *      or one specific class (< MD_MN_NCLASSES)
2810  *
2811  * Returns:
2812  *      MDMNE_ACK on sucess (unlocking an unlocked class is Ok)
2813  *      MDMNE_EINVAL if a parameter is out of range
2814  */
2815 /* ARGSUSED */
2816 int *
2817 mdmn_comm_unlock_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2818 {
2819         int                     *retval;
2820         set_t                   setno  = msc->msc_set;
2821         md_mn_msgclass_t        class  = msc->msc_class;
2822 
2823         retval = Malloc(sizeof (int));
2824 
2825         /* check if the global initialization is done */
2826         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2827                 global_init();
2828         }
2829 
2830         /* is this rpc request coming from the local node ? */
2831         if (check_license(rqstp, 0) == FALSE) {
2832                 xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2833                 *retval = MDMNE_RPC_FAIL;
2834                 return (retval);
2835         }
2836 
2837         /* Perform some range checking */
2838         if ((setno == 0) || (setno >= MD_MAXSETS) ||
2839             (class < MD_MSG_CLASS0) || (class >= MD_MN_NCLASSES)) {
2840                 *retval = MDMNE_EINVAL;
2841                 return (retval);
2842         }
2843         commd_debug(MD_MMV_MISC, "unlock: set=%d, class=%d\n", setno, class);
2844 
2845         (void) mutex_lock(&mdmn_busy_mutex[setno]);
2846         if (class != MD_MSG_CLASS0) {
2847                 mdmn_mark_class_unlocked(setno, class);
2848         } else {
2849                 /* MD_MSG_CLASS0 is used as a wild card for all classes */
2850                 for (class = MD_MSG_CLASS1; class < MD_MN_NCLASSES; class++) {
2851                         mdmn_mark_class_unlocked(setno, class);
2852                 }
2853         }
2854         (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2855 
2856         *retval = MDMNE_ACK;
2857         return (retval);
2858 }
2859 
2860 /*
2861  * mdmn_comm_suspend_svc_2(setno, class)
2862  *
2863  * Drain all outstanding messages for a given set/class combination
2864  * and don't allow new messages to be processed.
2865  *
2866  * Special messages of class MD_MSG_CLASS0 can never be locked.
2867  *      e.g. MD_MN_MSG_VERBOSITY
2868  *
2869  * 1 <= setno < MD_MAXSETS        or setno == MD_COMM_ALL_SETS
2870  * 1 <= class < MD_MN_NCLASSES    or class == MD_COMM_ALL_CLASSES
2871  *
2872  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
2873  * one class as being suspended.
2874  * If messages for this class are currently on their way,
2875  * MDMNE_SET_NOT_DRAINED is returned. Otherwise MDMNE_ACK is returned.
2876  *
2877  * If class _is_ MD_COMM_ALL_CLASSES we drain all classes of this set.
2878  * Messages must be generated in ascending order.
2879  * This means, a message cannot create submessages with the same or lower class.
2880  * Draining messages must go from 1 to NCLASSES in order to ensure we don't
2881  * generate a hanging situation here.
2882  * We mark class 1 as being suspended.
2883  * if the class is not busy, we proceed with class 2
2884  * and so on
2885  * if a class *is* busy, we cannot continue here, but return
2886  * MDMNE_SET_NOT_DRAINED.
2887  * We expect the caller to hold on for some seconds and try again.
2888  * When that message, that held the class busy is done in
2889  * mdmn_master_process_msg(), mdmn_mark_class_unbusy() called.
2890  * There it is checked if the class is about to drain.
2891  * In that case it tries to drain all higher classes there.
2892  *
2893  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
2894  * In that case we return MDMNE_SET_NOT_DRAINED if not all sets are
2895  * completely drained.
2896  *
2897  * Returns:
2898  *      MDMNE_ACK on sucess (set is drained, no outstanding messages)
2899  *      MDMNE_SET_NOT_DRAINED  if drain process is started, but there are
2900  *              still outstanding messages for this set(s)
2901  *      MDMNE_EINVAL if setno is out of range
2902  *      MDMNE_NOT_JOINED if the set is not yet initialized on this node
2903  */
2904 
2905 /* ARGSUSED */
2906 int *
2907 mdmn_comm_suspend_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
2908 {
2909         int                     *retval;
2910         int                     failure = 0;
2911         set_t                   startset, endset;
2912         set_t                   setno  = msc->msc_set;
2913         md_mn_msgclass_t        oclass = msc->msc_class;
2914 #ifdef NOT_YET_NEEDED
2915         uint_t                  flags  = msc->msc_flags;
2916 #endif /* NOT_YET_NEEDED */
2917         md_mn_msgclass_t        class;
2918 
2919         retval = Malloc(sizeof (int));
2920 
2921         /* check if the global initialization is done */
2922         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
2923                 global_init();
2924         }
2925 
2926         /* is this rpc request coming from the local node ? */
2927         if (check_license(rqstp, 0) == FALSE) {
2928                 xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
2929                 *retval = MDMNE_RPC_FAIL;
2930                 return (retval);
2931         }
2932 
2933         commd_debug(MD_MMV_MISC, "suspend: called for set=%d class=%d\n",
2934             setno, oclass);
2935 
2936         /* Perform some range checking */
2937         if (setno >= MD_MAXSETS) {
2938                 *retval = MDMNE_EINVAL;
2939                 commd_debug(MD_MMV_MISC, "suspend: returning MDMNE_EINVAL\n");
2940                 return (retval);
2941         }
2942 
2943         /*  setno == MD_COMM_ALL_SETS means: we walk thru all possible sets. */
2944         if (setno == MD_COMM_ALL_SETS) {
2945                 startset = 1;
2946                 endset = MD_MAXSETS - 1;
2947         } else {
2948                 startset = setno;
2949                 endset = setno;
2950         }
2951 
2952         for (setno = startset; setno <= endset; setno++) {
2953                 /* Here we need the mutexes for the set to be setup */
2954                 if (md_mn_set_inited[setno] != MDMN_SET_MUTEXES) {
2955                         (void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
2956                 }
2957 
2958                 (void) mutex_lock(&mdmn_busy_mutex[setno]);
2959                 /* shall we drain all classes of this set? */
2960                 if (oclass == MD_COMM_ALL_CLASSES) {
2961                         for (class = 1; class < MD_MN_NCLASSES; class ++) {
2962                                 commd_debug(MD_MMV_MISC,
2963                                     "suspend: suspending set %d, class %d\n",
2964                                     setno, class);
2965                                 *retval = mdmn_mark_class_suspended(setno,
2966                                     class, MDMN_SUSPEND_ALL);
2967                                 if (*retval == MDMNE_SET_NOT_DRAINED) {
2968                                         failure++;
2969                                 }
2970                         }
2971                 } else {
2972                         /* only drain one specific class */
2973                         commd_debug(MD_MMV_MISC,
2974                             "suspend: suspending set=%d class=%d\n",
2975                             setno, oclass);
2976                         *retval = mdmn_mark_class_suspended(setno, oclass,
2977                             MDMN_SUSPEND_1);
2978                         if (*retval == MDMNE_SET_NOT_DRAINED) {
2979                                 failure++;
2980                         }
2981                 }
2982                 (void) mutex_unlock(&mdmn_busy_mutex[setno]);
2983         }
2984         /* If one or more sets are not entirely drained, failure is non-zero */
2985         if (failure != 0) {
2986                 *retval = MDMNE_SET_NOT_DRAINED;
2987                 commd_debug(MD_MMV_MISC,
2988                     "suspend: returning MDMNE_SET_NOT_DRAINED\n");
2989         } else {
2990                 *retval = MDMNE_ACK;
2991         }
2992 
2993         return (retval);
2994 }
2995 
2996 /*
2997  * mdmn_comm_resume_svc_2(setno, class)
2998  *
2999  * Resume processing messages for a given set.
3000  * This incorporates the repeal of a previous suspend operation.
3001  *
3002  * 1 <= setno < MD_MAXSETS        or setno == MD_COMM_ALL_SETS
3003  * 1 <= class < MD_MN_NCLASSES    or class == MD_COMM_ALL_CLASSES
3004  *
3005  * If class _is_not_ MD_COMM_ALL_CLASSES, then we simply mark this
3006  * one class as being resumed.
3007  *
3008  * If class _is_ MD_COMM_ALL_CLASSES we resume all classes of this set.
3009  *
3010  * If setno is MD_COMM_ALL_SETS then we perform this on all possible sets.
3011  *
3012  * If both setno is MD_COMM_ALL_SETS and class is MD_COMM_ALL_CLASSES we also
3013  * reset any ABORT flag from the global state.
3014  *
3015  * Returns:
3016  *      MDMNE_ACK on sucess (resuming an unlocked set is Ok)
3017  *      MDMNE_EINVAL if setno is out of range
3018  *      MDMNE_NOT_JOINED if the set is not yet initialized on this node
3019  */
3020 /* ARGSUSED */
3021 int *
3022 mdmn_comm_resume_svc_2(md_mn_set_and_class_t *msc, struct svc_req *rqstp)
3023 {
3024         int                     *retval;
3025         set_t                   startset, endset;
3026         set_t                   setno  = msc->msc_set;
3027         md_mn_msgclass_t        oclass = msc->msc_class;
3028         uint_t                  flags  = msc->msc_flags;
3029         md_mn_msgclass_t        class;
3030 
3031         retval = Malloc(sizeof (int));
3032 
3033         /* check if the global initialization is done */
3034         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3035                 global_init();
3036         }
3037 
3038         /* is this rpc request coming from the local node ? */
3039         if (check_license(rqstp, 0) == FALSE) {
3040                 xdr_free(xdr_md_mn_set_and_class_t, (caddr_t)msc);
3041                 *retval = MDMNE_RPC_FAIL;
3042                 return (retval);
3043         }
3044 
3045         commd_debug(MD_MMV_MISC, "resume: called for set=%d class=%d\n",
3046             setno, oclass);
3047 
3048         /* Perform some range checking */
3049         if (setno > MD_MAXSETS) {
3050                 *retval = MDMNE_EINVAL;
3051                 return (retval);
3052         }
3053 
3054         if (setno == MD_COMM_ALL_SETS) {
3055                 startset = 1;
3056                 endset = MD_MAXSETS - 1;
3057                 if (oclass == MD_COMM_ALL_CLASSES) {
3058                         /* This is the point where we "unabort" the commd */
3059                         commd_debug(MD_MMV_MISC, "resume: resetting ABORT\n");
3060                         md_commd_global_state &= ~MD_CGS_ABORTED;
3061                 }
3062         } else {
3063                 startset = setno;
3064                 endset = setno;
3065         }
3066 
3067         for (setno = startset; setno <= endset; setno++) {
3068 
3069                 /* Here we need the mutexes for the set to be setup */
3070                 if ((md_mn_set_inited[setno] & MDMN_SET_MUTEXES) == 0) {
3071                         (void) mdmn_init_set(setno, MDMN_SET_MUTEXES);
3072                 }
3073 
3074                 (void) mutex_lock(&mdmn_busy_mutex[setno]);
3075 
3076                 if (oclass == MD_COMM_ALL_CLASSES) {
3077                         int end_class = 1;
3078                         /*
3079                          * When SUSPENDing all classes, we go
3080                          * from 1 to MD_MN_NCLASSES-1
3081                          * The correct reverse action is RESUMing
3082                          * from MD_MN_NCLASSES-1 to 1 (or 2)
3083                          */
3084 
3085                         if (flags & MD_MSCF_DONT_RESUME_CLASS1) {
3086                                 end_class = 2;
3087                         }
3088 
3089                         /*
3090                          * Then mark all classes of this set as no longer
3091                          * suspended. This supersedes any previous suspend(1)
3092                          * calls and resumes the set entirely.
3093                          */
3094                         for (class = MD_MN_NCLASSES - 1; class >= end_class;
3095                             class --) {
3096                                 commd_debug(MD_MMV_MISC,
3097                                     "resume: resuming set=%d class=%d\n",
3098                                     setno, class);
3099                                 mdmn_mark_class_resumed(setno, class,
3100                                     (MDMN_SUSPEND_ALL | MDMN_SUSPEND_1));
3101                         }
3102                 } else {
3103                         /*
3104                          * In this case only one class is marked as not
3105                          * suspended. If a suspend(all) is currently active for
3106                          * this set, this class will still be suspended.
3107                          * That state will be cleared by a suspend(all)
3108                          * (see above)
3109                          */
3110                         commd_debug(MD_MMV_MISC,
3111                             "resume: resuming set=%d class=%d\n",
3112                             setno, oclass);
3113                         mdmn_mark_class_resumed(setno, oclass, MDMN_SUSPEND_1);
3114                 }
3115 
3116                 (void) mutex_unlock(&mdmn_busy_mutex[setno]);
3117         }
3118 
3119         *retval = MDMNE_ACK;
3120         return (retval);
3121 }
3122 /* ARGSUSED */
3123 int *
3124 mdmn_comm_reinit_set_svc_2(set_t *setnop, struct svc_req *rqstp)
3125 {
3126         int             *retval;
3127         md_mnnode_desc  *node;
3128         set_t            setno = *setnop;
3129 
3130         retval = Malloc(sizeof (int));
3131 
3132         /* check if the global initialization is done */
3133         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3134                 global_init();
3135         }
3136 
3137         /* is this rpc request coming from the local node ? */
3138         if (check_license(rqstp, 0) == FALSE) {
3139                 xdr_free(xdr_set_t, (caddr_t)setnop);
3140                 *retval = MDMNE_RPC_FAIL;
3141                 return (retval);
3142         }
3143 
3144         commd_debug(MD_MMV_MISC, "reinit: set=%d\n", setno);
3145 
3146         (void) rw_rdlock(&set_desc_rwlock[setno]);
3147         /*
3148          * We assume, that all messages have been suspended previously.
3149          *
3150          * As we are modifying lots of clients here we grab the client_rwlock
3151          * in writer mode. This ensures, no new messages come in.
3152          */
3153         (void) rw_wrlock(&client_rwlock[setno]);
3154         /* This set is no longer initialized */
3155 
3156         if ((set_descriptor[setno] != NULL) &&
3157             (md_mn_set_inited[setno] & MDMN_SET_NODES)) {
3158                 /* destroy all rpc clients from this set */
3159                 for (node = set_descriptor[setno]->sd_nodelist; node;
3160                     node = node->nd_next) {
3161                         /*
3162                          * Since the CLIENT for ourself will be recreated
3163                          * shortly, and this node is guaranteed to be
3164                          * there after a reconfig, there's no reason to go
3165                          * through destroying it.  It also avoids an issue
3166                          * with calling clnt_create() later from within the
3167                          * server thread, which can effectively deadlock
3168                          * itself due to RPC design limitations.
3169                          */
3170                         if (node == set_descriptor[setno]->sd_mn_mynode)
3171                                 continue;
3172                         mdmn_clnt_destroy(client[setno][node->nd_nodeid]);
3173                         if (client[setno][node->nd_nodeid] != (CLIENT *)NULL) {
3174                                 client[setno][node->nd_nodeid] = (CLIENT *)NULL;
3175                         }
3176                 }
3177                 md_mn_set_inited[setno] &= ~MDMN_SET_NODES;
3178         }
3179 
3180         commd_debug(MD_MMV_MISC, "reinit: done init_set(%d)\n", setno);
3181 
3182         (void) rw_unlock(&client_rwlock[setno]);
3183         (void) rw_unlock(&set_desc_rwlock[setno]);
3184         *retval = MDMNE_ACK;
3185         return (retval);
3186 }
3187 
3188 /*
3189  * This is just an interface for testing purpose.
3190  * Here we can disable single message types.
3191  * If we block a message type, this is valid for all MN sets.
3192  * If a message arrives later, and  it's message type is blocked, it will
3193  * be returned immediately with MDMNE_CLASS_LOCKED, which causes the sender to
3194  * resend this message over and over again.
3195  */
3196 
3197 /* ARGSUSED */
3198 int *
3199 mdmn_comm_msglock_svc_2(md_mn_type_and_lock_t *mmtl, struct svc_req *rqstp)
3200 {
3201         int                     *retval;
3202         md_mn_msgtype_t         type = mmtl->mmtl_type;
3203         uint_t                  lock = mmtl->mmtl_lock;
3204 
3205         retval = Malloc(sizeof (int));
3206 
3207         /* check if the global initialization is done */
3208         if ((md_commd_global_state & MD_CGS_INITED) == 0) {
3209                 global_init();
3210         }
3211 
3212         /* is this rpc request coming from the local node ? */
3213         if (check_license(rqstp, 0) == FALSE) {
3214                 xdr_free(xdr_md_mn_type_and_lock_t, (caddr_t)mmtl);
3215                 *retval = MDMNE_RPC_FAIL;
3216                 return (retval);
3217         }
3218 
3219         /* Perform some range checking */
3220         if ((type == 0) || (type >= MD_MN_NMESSAGES)) {
3221                 *retval = MDMNE_EINVAL;
3222                 return (retval);
3223         }
3224 
3225         commd_debug(MD_MMV_MISC, "msglock: type=%d, lock=%d\n", type, lock);
3226         msgtype_lock_state[type] = lock;
3227 
3228         *retval = MDMNE_ACK;
3229         return (retval);
3230 }