1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 
  26 #ifdef DISTRIBUTED
  27 /*
  28  *      dist.cc
  29  *
  30  *      Deal with the distributed processing
  31  */
  32 
  33 #include <avo/err.h>
  34 #include <avo/find_dir.h>
  35 #include <avo/util.h>
  36 #include <dm/Avo_AcknowledgeMsg.h>
  37 #include <dm/Avo_DoJobMsg.h>
  38 #include <dm/Avo_JobResultMsg.h>
  39 #include <mk/defs.h>
  40 #include <mksh/misc.h>            /* getmem() */
  41 #include <rw/pstream.h>
  42 #include <rw/queuecol.h>
  43 #include <rw/xdrstrea.h>
  44 #include <signal.h>
  45 #ifdef linux
  46 #include <sstream>
  47 using namespace std;
  48 #else
  49 #include <strstream.h>
  50 #endif
  51 #include <sys/stat.h>             /* stat() */
  52 #include <sys/types.h>
  53 #include <sys/wait.h>
  54 #include <unistd.h>
  55 #include <errno.h>
  56 
  57 /*
  58  * Defined macros
  59  */
  60 
  61 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \
  62         sigprocmask(SIG_SETMASK, &newset, &oldset)
  63  
  64 #define AVO_UNBLOCK_INTERUPTS \
  65         sigprocmask(SIG_SETMASK, &oldset, &newset)
  66 
  67 
  68 /*
  69  * typedefs & structs
  70  */
  71 
  72 /*
  73  * Static variables
  74  */
  75 int             dmake_ifd; 
  76 FILE*           dmake_ifp; 
  77 XDR             xdrs_in;
  78 
  79 int             dmake_ofd;
  80 FILE*           dmake_ofp;
  81 XDR             xdrs_out;
  82 
  83 // These instances are required for the RWfactory.
  84 static Avo_JobResultMsg dummyJobResultMsg;
  85 static Avo_AcknowledgeMsg dummyAcknowledgeMsg;
  86 static int firstAcknowledgeReceived = 0;
  87  
  88 int             rxmPid = 0;
  89 
  90 /*
  91  * File table of contents
  92  */
  93 static void     set_dmake_env_vars(void);
  94 
  95 /*
  96  * void
  97  * startup_rxm(void)
  98  *
  99  * When startup_rxm() is called, read_command_options() and
 100  * read_files_and_state() have already been read, so DMake
 101  * will now know what options to pass to rxm.
 102  *
 103  * rxm
 104  *     [ -k ] [ -n ]
 105  *     [ -c <dmake_rcfile> ]
 106  *     [ -g <dmake_group> ]
 107  *     [ -j <dmake_max_jobs> ]
 108  *     [ -m <dmake_mode> ]
 109  *     [ -o <dmake_odir> ]
 110  *     <read_fd> <write_fd>
 111  *
 112  * rxm will, among other things, read the rc file.
 113  *
 114  */
 115 void
 116 startup_rxm(void)
 117 {
 118         Name            dmake_name;
 119         Name            dmake_value;
 120         Avo_err         *find_run_dir_err;
 121         int             pipe1[2], pipe2[2];
 122         Property        prop;
 123         char            *run_dir;
 124         char            rxm_command[MAXPATHLEN];
 125         int             rxm_debug = 0;
 126 
 127         int             length;
 128         char *          env;
 129 
 130         firstAcknowledgeReceived = 0;
 131         /*
 132          * Create two pipes, one for dmake->rxm, and one for rxm->dmake.
 133          * pipe1 is dmake->rxm,
 134          * pipe2 is rxm->dmake.
 135          */
 136         if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) {
 137                 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno));
 138         }
 139 
 140         set_dmake_env_vars();
 141 
 142         if ((rxmPid = fork()) < 0) { /* error */
 143                 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno));
 144         } else if (rxmPid > 0) {     /* parent, dmake */
 145                 dmake_ofd = pipe1[1];   // write side of pipe
 146                 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) {
 147                         fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno));
 148                 }
 149                 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE);
 150 
 151                 dmake_ifd = pipe2[0];   // read side of pipe
 152                 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) {
 153                         fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno));
 154                 }
 155                 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE);
 156 
 157                 close(pipe1[0]);        // read side
 158                 close(pipe2[1]);        // write side
 159         } else {                        /* child, rxm */
 160                 close(pipe1[1]);        // write side
 161                 close(pipe2[0]);        // read side
 162 
 163                 /* Find the run directory of dmake, for rxm. */
 164                 find_run_dir_err = avo_find_run_dir(&run_dir);
 165                 if (find_run_dir_err) {
 166                         delete find_run_dir_err;
 167                         /* Use the path to find rxm. */
 168                         (void) sprintf(rxm_command, NOCATGETS("rxm"));
 169                 } else {
 170                         /* Use the run dir of dmake for rxm. */
 171                         (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir);
 172                 }
 173 
 174                 if (continue_after_error) {
 175                         (void) strcat(rxm_command, NOCATGETS(" -k"));
 176                 }
 177                 if (do_not_exec_rule) {
 178                         (void) strcat(rxm_command, NOCATGETS(" -n"));
 179                 }
 180                 if (rxm_debug) {
 181                         (void) strcat(rxm_command, NOCATGETS(" -S"));
 182                 }
 183                 if (send_mtool_msgs) {
 184                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 185                                        NOCATGETS(" -O %d"),
 186                                        mtool_msgs_fd);
 187                 }
 188                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE"));
 189                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 190                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 191                     ((dmake_value = prop->body.macro.value) != NULL)) {
 192                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 193                                        NOCATGETS(" -c %s"),
 194                                        dmake_value->string_mb);
 195                 } else {
 196                         length = 2 + strlen(NOCATGETS("DMAKE_RCFILE"));
 197                         env = getmem(length);
 198                         (void) sprintf(env,
 199                                        "%s=",
 200                                        NOCATGETS("DMAKE_RCFILE"));
 201                         (void) putenv(env);
 202                 }
 203                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP"));
 204                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 205                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 206                     ((dmake_value = prop->body.macro.value) != NULL)) {
 207                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 208                                        NOCATGETS(" -g %s"),
 209                                        dmake_value->string_mb);
 210                 } else {
 211                         length = 2 + strlen(NOCATGETS("DMAKE_GROUP"));
 212                         env = getmem(length);
 213                         (void) sprintf(env,
 214                                        "%s=",
 215                                        NOCATGETS("DMAKE_GROUP"));
 216                         (void) putenv(env);
 217                 }
 218                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS"));
 219                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 220                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 221                     ((dmake_value = prop->body.macro.value) != NULL)) {
 222                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 223                                        NOCATGETS(" -j %s"),
 224                                        dmake_value->string_mb);
 225                 } else {
 226                         length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS"));
 227                         env = getmem(length);
 228                         (void) sprintf(env,
 229                                        "%s=",
 230                                        NOCATGETS("DMAKE_MAX_JOBS"));
 231                         (void) putenv(env);
 232                 }
 233                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE"));
 234                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 235                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 236                     ((dmake_value = prop->body.macro.value) != NULL)) {
 237                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 238                                        NOCATGETS(" -m %s"),
 239                                        dmake_value->string_mb);
 240                 } else {
 241                         length = 2 + strlen(NOCATGETS("DMAKE_MODE"));
 242                         env = getmem(length);
 243                         (void) sprintf(env,
 244                                        "%s=",
 245                                        NOCATGETS("DMAKE_MODE"));
 246                         (void) putenv(env);
 247                 }
 248                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR"));
 249                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 250                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 251                     ((dmake_value = prop->body.macro.value) != NULL)) {
 252                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 253                                        NOCATGETS(" -o %s"),
 254                                        dmake_value->string_mb);
 255                 } else {
 256                         length = 2 + strlen(NOCATGETS("DMAKE_ODIR"));
 257                         env = getmem(length);
 258                         (void) sprintf(env,
 259                                        "%s=",
 260                                        NOCATGETS("DMAKE_ODIR"));
 261                         (void) putenv(env);
 262                 }
 263 
 264                 (void) sprintf(&rxm_command[strlen(rxm_command)],
 265                                NOCATGETS(" %d %d"),
 266                                pipe1[0], pipe2[1]);
 267 #ifdef linux
 268                 execl(NOCATGETS("/bin/sh"),
 269 #else
 270                 execl(NOCATGETS("/usr/bin/sh"),
 271 #endif
 272                       NOCATGETS("sh"),
 273                       NOCATGETS("-c"),
 274                       rxm_command,
 275                       (char *)NULL);
 276                 _exit(127);
 277         }
 278 }
 279 
 280 /*
 281  * static void
 282  * set_dmake_env_vars()
 283  *
 284  * Sets the DMAKE_* environment variables for rxm and rxs.
 285  *      DMAKE_PWD
 286  *      DMAKE_NPWD
 287  *      DMAKE_UMASK
 288  *      DMAKE_SHELL
 289  */
 290 static void
 291 set_dmake_env_vars()
 292 {
 293         char            *current_netpath;
 294         char            *current_path;
 295         static char     *env;
 296         int             length;
 297         char            netpath[MAXPATHLEN];
 298         mode_t          um;
 299         char            um_buf[MAXPATHLEN];
 300         Name            dmake_name;
 301         Name            dmake_value;
 302         Property        prop;
 303 
 304 #ifdef REDIRECT_ERR
 305         /* Set __DMAKE_REDIRECT_STDERR */
 306         length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1;
 307         env = getmem(length);
 308         (void) sprintf(env,
 309                        "%s=%s",
 310                        NOCATGETS("__DMAKE_REDIRECT_STDERR"),
 311                        out_err_same ? NOCATGETS("0") : NOCATGETS("1"));
 312         (void) putenv(env);
 313 #endif
 314 
 315         /* Set DMAKE_PWD to the current working directory */
 316         current_path = get_current_path();
 317         length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path);
 318         env = getmem(length);
 319         (void) sprintf(env,
 320                        "%s=%s",
 321                        NOCATGETS("DMAKE_PWD"),
 322                        current_path);
 323         (void) putenv(env);
 324 
 325         /* Set DMAKE_NPWD to machine:pathname */
 326         if (avo_path_to_netpath(current_path, netpath)) {
 327                 current_netpath = netpath;
 328         } else {
 329                 current_netpath = current_path;
 330         }
 331         length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath);
 332         env = getmem(length);
 333         (void) sprintf(env,
 334                        "%s=%s",
 335                        NOCATGETS("DMAKE_NPWD"),
 336                        current_netpath);
 337         (void) putenv(env);
 338 
 339         /* Set DMAKE_UMASK to the value of umask */
 340         um = umask(0);
 341         umask(um);
 342         (void) sprintf(um_buf, NOCATGETS("%ul"), um);
 343         length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf);
 344         env = getmem(length);
 345         (void) sprintf(env,
 346                        "%s=%s",
 347                        NOCATGETS("DMAKE_UMASK"),
 348                        um_buf);
 349         (void) putenv(env);
 350 
 351         if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) &&
 352             ((dmake_value = prop->body.macro.value) != NULL)) {
 353                 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) +
 354                          strlen(dmake_value->string_mb);
 355                 env = getmem(length);
 356                 (void) sprintf(env,
 357                            "%s=%s",
 358                            NOCATGETS("DMAKE_SHELL"),
 359                            dmake_value->string_mb);
 360                 (void) putenv(env);
 361         }
 362         MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE"));
 363         dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 364         if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 365             ((dmake_value = prop->body.macro.value) != NULL)) {
 366                 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) +
 367                          strlen(dmake_value->string_mb);
 368                 env = getmem(length);
 369                 (void) sprintf(env,
 370                            "%s=%s",
 371                            NOCATGETS("DMAKE_OUTPUT_MODE"),
 372                            dmake_value->string_mb);
 373                 (void) putenv(env);
 374         }
 375 }
 376 
 377 /*
 378  * void
 379  * distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
 380  *
 381  * Write the DMake rule to be distributed down the pipe to rxm.
 382  *
 383  */
 384 void
 385 distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
 386 {
 387         /* Add all dynamic env vars to the dmake_job_msg. */
 388         setvar_envvar(dmake_job_msg);
 389 
 390         /*
 391          * Copying dosys()...
 392          * Stat .make.state to see if we'll need to reread it later
 393          */
 394         make_state->stat.time = file_no_time;
 395         (void)exists(make_state);
 396         make_state_before = make_state->stat.time;
 397 
 398         // Wait for the first Acknowledge message from the rxm process
 399         // before sending the first message.
 400         if (!firstAcknowledgeReceived) {
 401                 firstAcknowledgeReceived++;
 402                 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg();
 403                 if (msg) {
 404                         delete msg;
 405                 }
 406         }
 407 
 408         RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg;
 409         sigset_t newset;
 410         sigset_t oldset;
 411 
 412         AVO_BLOCK_INTERUPTS;
 413         int xdrResult = xdr(&xdrs_out, doJobMsg);
 414 
 415         if (xdrResult) {
 416                 fflush(dmake_ofp);
 417                 AVO_UNBLOCK_INTERUPTS;
 418         } else {
 419                 AVO_UNBLOCK_INTERUPTS;
 420                 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm"));
 421         }
 422 
 423         delete dmake_job_msg;
 424 }
 425 
 426 // Queue for JobResult messages.
 427 static RWSlistCollectablesQueue jobResultQueue;
 428 
 429 // Queue for Acknowledge messages.
 430 static RWSlistCollectablesQueue acknowledgeQueue;
 431 
 432 // Read a message from the rxm process, and put it into a queue, by
 433 // message type. Return the message type.
 434 
 435 int
 436 getRxmMessage(void)
 437 {
 438         RWCollectable   *msg = (RWCollectable *)0;
 439         int             msgType = 0;
 440 //      sigset_t        newset;
 441 //      sigset_t        oldset;
 442 
 443         // It seems unnecessarily to block interrupts here because
 444         // any nonignored signal means exit for dmake in distributed mode.
 445 //      AVO_BLOCK_INTERUPTS;
 446         int xdrResult = xdr(&xdrs_in, msg);
 447 //      AVO_UNBLOCK_INTERUPTS;
 448 
 449         if (xdrResult) {
 450                 switch(msg->isA()) {
 451                 case __AVO_ACKNOWLEDGEMSG:
 452                         acknowledgeQueue.append(msg);
 453                         msgType = __AVO_ACKNOWLEDGEMSG;
 454                         break;
 455                 case __AVO_JOBRESULTMSG:
 456                         jobResultQueue.append(msg);
 457                         msgType = __AVO_JOBRESULTMSG;
 458                         break;
 459                 default:
 460                         warning(catgets(catd, 1, 291, "Unknown message on rxm input fd"));
 461                         msgType = 0;
 462                         break;
 463                 }
 464         } else {
 465                 if (errno == EINTR) {
 466                         fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr);
 467                 }
 468                 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm"));
 469         }
 470 
 471         return msgType;
 472 }
 473 
 474 // Get a JobResult message from it's queue, and
 475 // if the queue is empty, call the getRxmMessage() function until
 476 // a JobResult message shows.
 477 
 478 Avo_JobResultMsg *
 479 getJobResultMsg(void)
 480 {
 481         RWCollectable *msg = 0;
 482 
 483         if (!(msg = jobResultQueue.get())) {
 484                 while (getRxmMessage() != __AVO_JOBRESULTMSG);
 485                 msg = jobResultQueue.get();
 486         }
 487 
 488         return (Avo_JobResultMsg *)msg;
 489 }
 490 
 491 // Get an Acknowledge message from it's queue, and
 492 // if the queue is empty, call the getRxmMessage() function until
 493 // a Acknowledge message shows.
 494 
 495 Avo_AcknowledgeMsg *
 496 getAcknowledgeMsg(void)
 497 {
 498         RWCollectable *msg = 0;
 499 
 500         if (!(msg = acknowledgeQueue.get())) {
 501                 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG);
 502                 msg = acknowledgeQueue.get();
 503         }
 504 
 505         return (Avo_AcknowledgeMsg *)msg;
 506 }
 507 
 508 
 509 /*
 510  * Doname
 511  * await_dist(Boolean waitflg)
 512  *
 513  * Waits for distributed children to exit and finishes their processing.
 514  * Rxm will send a msg down the pipe when a child is done.
 515  *
 516  */
 517 Doname
 518 await_dist(Boolean waitflg)
 519 {
 520         Avo_JobResultMsg        *dmake_result_msg;
 521         int                     job_msg_id;
 522         Doname                  result = build_ok;
 523         int                     result_msg_cmd_status;
 524         int                     result_msg_job_status;
 525         Running                 rp;
 526 
 527         while (!(dmake_result_msg = getJobResultMsg()));
 528         job_msg_id = dmake_result_msg->getId();
 529         result_msg_cmd_status = dmake_result_msg->getCmdStatus();
 530         result_msg_job_status = dmake_result_msg->getJobStatus();
 531 
 532         if (waitflg) {
 533                 result = (result_msg_cmd_status == 0) ? build_ok : build_failed;
 534 
 535 #ifdef PRINT_EXIT_STATUS
 536                 if (result == build_ok) {
 537                         warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok."));
 538                 } else {
 539                         warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed."));
 540                 }
 541 #endif
 542 
 543         } else {
 544                 for (rp = running_list;
 545                      (rp != NULL) && (job_msg_id != rp->job_msg_id);
 546                      rp = rp->next) {
 547                 }
 548                 if (rp == NULL) {
 549                         fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list"));
 550                 } else {
 551                         /* XXX - This may not be correct! */
 552                         if (result_msg_job_status == RETURNED) {
 553                                 rp->state = build_ok;
 554                         } else {
 555                                 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed;
 556                         }
 557                         result = rp->state;
 558 
 559 #ifdef PRINT_EXIT_STATUS
 560                         if (result == build_ok) {
 561                                 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok."));
 562                         } else {
 563                                 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed."));
 564                         }
 565 #endif
 566 
 567                 }
 568                 parallel_process_cnt--;
 569         }
 570         delete dmake_result_msg;
 571         return result;
 572 }
 573 
 574 #endif
 575 
 576