1 /*
   2  * CDDL HEADER START
   3  *
   4  * The contents of this file are subject to the terms of the
   5  * Common Development and Distribution License (the "License").
   6  * You may not use this file except in compliance with the License.
   7  *
   8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
   9  * or http://www.opensolaris.org/os/licensing.
  10  * See the License for the specific language governing permissions
  11  * and limitations under the License.
  12  *
  13  * When distributing Covered Code, include this CDDL HEADER in each
  14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
  15  * If applicable, add the following below this CDDL HEADER, with the
  16  * fields enclosed by brackets "[]" replaced with your own identifying
  17  * information: Portions Copyright [yyyy] [name of copyright owner]
  18  *
  19  * CDDL HEADER END
  20  */
  21 /*
  22  * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
  23  * Use is subject to license terms.
  24  */
  25 /*
  26  * @(#)dist.cc 1.35 06/12/12
  27  */
  28 
  29 #pragma ident   "@(#)dist.cc    1.25    96/03/12"
  30 
  31 #ifdef DISTRIBUTED
  32 /*
  33  *      dist.cc
  34  *
  35  *      Deal with the distributed processing
  36  */
  37 
  38 #include <avo/err.h>
  39 #include <avo/find_dir.h>
  40 #include <avo/util.h>
  41 #include <dm/Avo_AcknowledgeMsg.h>
  42 #include <dm/Avo_DoJobMsg.h>
  43 #include <dm/Avo_JobResultMsg.h>
  44 #include <mk/defs.h>
  45 #include <mksh/misc.h>            /* getmem() */
  46 #include <rw/pstream.h>
  47 #include <rw/queuecol.h>
  48 #include <rw/xdrstrea.h>
  49 #include <signal.h>
  50 #ifdef linux
  51 #include <sstream>
  52 using namespace std;
  53 #else
  54 #include <strstream.h>
  55 #endif
  56 #include <sys/stat.h>             /* stat() */
  57 #include <sys/types.h>
  58 #include <sys/wait.h>
  59 #include <unistd.h>
  60 #include <errno.h>
  61 
  62 /*
  63  * Defined macros
  64  */
  65 
  66 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \
  67         sigprocmask(SIG_SETMASK, &newset, &oldset)
  68  
  69 #define AVO_UNBLOCK_INTERUPTS \
  70         sigprocmask(SIG_SETMASK, &oldset, &newset)
  71 
  72 
  73 /*
  74  * typedefs & structs
  75  */
  76 
  77 /*
  78  * Static variables
  79  */
  80 int             dmake_ifd; 
  81 FILE*           dmake_ifp; 
  82 XDR             xdrs_in;
  83 
  84 int             dmake_ofd;
  85 FILE*           dmake_ofp;
  86 XDR             xdrs_out;
  87 
  88 // These instances are required for the RWfactory.
  89 static Avo_JobResultMsg dummyJobResultMsg;
  90 static Avo_AcknowledgeMsg dummyAcknowledgeMsg;
  91 static int firstAcknowledgeReceived = 0;
  92  
  93 int             rxmPid = 0;
  94 
  95 /*
  96  * File table of contents
  97  */
  98 static void     set_dmake_env_vars(void);
  99 
 100 /*
 101  * void
 102  * startup_rxm(void)
 103  *
 104  * When startup_rxm() is called, read_command_options() and
 105  * read_files_and_state() have already been read, so DMake
 106  * will now know what options to pass to rxm.
 107  *
 108  * rxm
 109  *     [ -k ] [ -n ]
 110  *     [ -c <dmake_rcfile> ]
 111  *     [ -g <dmake_group> ]
 112  *     [ -j <dmake_max_jobs> ]
 113  *     [ -m <dmake_mode> ]
 114  *     [ -o <dmake_odir> ]
 115  *     <read_fd> <write_fd>
 116  *
 117  * rxm will, among other things, read the rc file.
 118  *
 119  */
 120 void
 121 startup_rxm(void)
 122 {
 123         Name            dmake_name;
 124         Name            dmake_value;
 125         Avo_err         *find_run_dir_err;
 126         int             pipe1[2], pipe2[2];
 127         Property        prop;
 128         char            *run_dir;
 129         char            rxm_command[MAXPATHLEN];
 130         int             rxm_debug = 0;
 131 
 132         int             length;
 133         char *          env;
 134 
 135         firstAcknowledgeReceived = 0;
 136         /*
 137          * Create two pipes, one for dmake->rxm, and one for rxm->dmake.
 138          * pipe1 is dmake->rxm,
 139          * pipe2 is rxm->dmake.
 140          */
 141         if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) {
 142                 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno));
 143         }
 144 
 145         set_dmake_env_vars();
 146 
 147         if ((rxmPid = fork()) < 0) { /* error */
 148                 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno));
 149         } else if (rxmPid > 0) {     /* parent, dmake */
 150                 dmake_ofd = pipe1[1];   // write side of pipe
 151                 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) {
 152                         fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno));
 153                 }
 154                 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE);
 155 
 156                 dmake_ifd = pipe2[0];   // read side of pipe
 157                 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) {
 158                         fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno));
 159                 }
 160                 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE);
 161 
 162                 close(pipe1[0]);        // read side
 163                 close(pipe2[1]);        // write side
 164         } else {                        /* child, rxm */
 165                 close(pipe1[1]);        // write side
 166                 close(pipe2[0]);        // read side
 167 
 168                 /* Find the run directory of dmake, for rxm. */
 169                 find_run_dir_err = avo_find_run_dir(&run_dir);
 170                 if (find_run_dir_err) {
 171                         delete find_run_dir_err;
 172                         /* Use the path to find rxm. */
 173                         (void) sprintf(rxm_command, NOCATGETS("rxm"));
 174                 } else {
 175                         /* Use the run dir of dmake for rxm. */
 176                         (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir);
 177                 }
 178 
 179                 if (continue_after_error) {
 180                         (void) strcat(rxm_command, NOCATGETS(" -k"));
 181                 }
 182                 if (do_not_exec_rule) {
 183                         (void) strcat(rxm_command, NOCATGETS(" -n"));
 184                 }
 185                 if (rxm_debug) {
 186                         (void) strcat(rxm_command, NOCATGETS(" -S"));
 187                 }
 188                 if (send_mtool_msgs) {
 189                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 190                                        NOCATGETS(" -O %d"),
 191                                        mtool_msgs_fd);
 192                 }
 193                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE"));
 194                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 195                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 196                     ((dmake_value = prop->body.macro.value) != NULL)) {
 197                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 198                                        NOCATGETS(" -c %s"),
 199                                        dmake_value->string_mb);
 200                 } else {
 201                         length = 2 + strlen(NOCATGETS("DMAKE_RCFILE"));
 202                         env = getmem(length);
 203                         (void) sprintf(env,
 204                                        "%s=",
 205                                        NOCATGETS("DMAKE_RCFILE"));
 206                         (void) putenv(env);
 207                 }
 208                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP"));
 209                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 210                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 211                     ((dmake_value = prop->body.macro.value) != NULL)) {
 212                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 213                                        NOCATGETS(" -g %s"),
 214                                        dmake_value->string_mb);
 215                 } else {
 216                         length = 2 + strlen(NOCATGETS("DMAKE_GROUP"));
 217                         env = getmem(length);
 218                         (void) sprintf(env,
 219                                        "%s=",
 220                                        NOCATGETS("DMAKE_GROUP"));
 221                         (void) putenv(env);
 222                 }
 223                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS"));
 224                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 225                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 226                     ((dmake_value = prop->body.macro.value) != NULL)) {
 227                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 228                                        NOCATGETS(" -j %s"),
 229                                        dmake_value->string_mb);
 230                 } else {
 231                         length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS"));
 232                         env = getmem(length);
 233                         (void) sprintf(env,
 234                                        "%s=",
 235                                        NOCATGETS("DMAKE_MAX_JOBS"));
 236                         (void) putenv(env);
 237                 }
 238                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE"));
 239                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 240                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 241                     ((dmake_value = prop->body.macro.value) != NULL)) {
 242                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 243                                        NOCATGETS(" -m %s"),
 244                                        dmake_value->string_mb);
 245                 } else {
 246                         length = 2 + strlen(NOCATGETS("DMAKE_MODE"));
 247                         env = getmem(length);
 248                         (void) sprintf(env,
 249                                        "%s=",
 250                                        NOCATGETS("DMAKE_MODE"));
 251                         (void) putenv(env);
 252                 }
 253                 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR"));
 254                 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 255                 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 256                     ((dmake_value = prop->body.macro.value) != NULL)) {
 257                         (void) sprintf(&rxm_command[strlen(rxm_command)],
 258                                        NOCATGETS(" -o %s"),
 259                                        dmake_value->string_mb);
 260                 } else {
 261                         length = 2 + strlen(NOCATGETS("DMAKE_ODIR"));
 262                         env = getmem(length);
 263                         (void) sprintf(env,
 264                                        "%s=",
 265                                        NOCATGETS("DMAKE_ODIR"));
 266                         (void) putenv(env);
 267                 }
 268 
 269                 (void) sprintf(&rxm_command[strlen(rxm_command)],
 270                                NOCATGETS(" %d %d"),
 271                                pipe1[0], pipe2[1]);
 272 #ifdef linux
 273                 execl(NOCATGETS("/bin/sh"),
 274 #else
 275                 execl(NOCATGETS("/usr/bin/sh"),
 276 #endif
 277                       NOCATGETS("sh"),
 278                       NOCATGETS("-c"),
 279                       rxm_command,
 280                       (char *)NULL);
 281                 _exit(127);
 282         }
 283 }
 284 
 285 /*
 286  * static void
 287  * set_dmake_env_vars()
 288  *
 289  * Sets the DMAKE_* environment variables for rxm and rxs.
 290  *      DMAKE_PWD
 291  *      DMAKE_NPWD
 292  *      DMAKE_UMASK
 293  *      DMAKE_SHELL
 294  */
 295 static void
 296 set_dmake_env_vars()
 297 {
 298         char            *current_netpath;
 299         char            *current_path;
 300         static char     *env;
 301         int             length;
 302         char            netpath[MAXPATHLEN];
 303         mode_t          um;
 304         char            um_buf[MAXPATHLEN];
 305         Name            dmake_name;
 306         Name            dmake_value;
 307         Property        prop;
 308 
 309 #ifdef REDIRECT_ERR
 310         /* Set __DMAKE_REDIRECT_STDERR */
 311         length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1;
 312         env = getmem(length);
 313         (void) sprintf(env,
 314                        "%s=%s",
 315                        NOCATGETS("__DMAKE_REDIRECT_STDERR"),
 316                        out_err_same ? NOCATGETS("0") : NOCATGETS("1"));
 317         (void) putenv(env);
 318 #endif
 319 
 320         /* Set DMAKE_PWD to the current working directory */
 321         current_path = get_current_path();
 322         length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path);
 323         env = getmem(length);
 324         (void) sprintf(env,
 325                        "%s=%s",
 326                        NOCATGETS("DMAKE_PWD"),
 327                        current_path);
 328         (void) putenv(env);
 329 
 330         /* Set DMAKE_NPWD to machine:pathname */
 331         if (avo_path_to_netpath(current_path, netpath)) {
 332                 current_netpath = netpath;
 333         } else {
 334                 current_netpath = current_path;
 335         }
 336         length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath);
 337         env = getmem(length);
 338         (void) sprintf(env,
 339                        "%s=%s",
 340                        NOCATGETS("DMAKE_NPWD"),
 341                        current_netpath);
 342         (void) putenv(env);
 343 
 344         /* Set DMAKE_UMASK to the value of umask */
 345         um = umask(0);
 346         umask(um);
 347         (void) sprintf(um_buf, NOCATGETS("%ul"), um);
 348         length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf);
 349         env = getmem(length);
 350         (void) sprintf(env,
 351                        "%s=%s",
 352                        NOCATGETS("DMAKE_UMASK"),
 353                        um_buf);
 354         (void) putenv(env);
 355 
 356         if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) &&
 357             ((dmake_value = prop->body.macro.value) != NULL)) {
 358                 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) +
 359                          strlen(dmake_value->string_mb);
 360                 env = getmem(length);
 361                 (void) sprintf(env,
 362                            "%s=%s",
 363                            NOCATGETS("DMAKE_SHELL"),
 364                            dmake_value->string_mb);
 365                 (void) putenv(env);
 366         }
 367         MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE"));
 368         dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
 369         if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
 370             ((dmake_value = prop->body.macro.value) != NULL)) {
 371                 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) +
 372                          strlen(dmake_value->string_mb);
 373                 env = getmem(length);
 374                 (void) sprintf(env,
 375                            "%s=%s",
 376                            NOCATGETS("DMAKE_OUTPUT_MODE"),
 377                            dmake_value->string_mb);
 378                 (void) putenv(env);
 379         }
 380 }
 381 
 382 /*
 383  * void
 384  * distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
 385  *
 386  * Write the DMake rule to be distributed down the pipe to rxm.
 387  *
 388  */
 389 void
 390 distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
 391 {
 392         /* Add all dynamic env vars to the dmake_job_msg. */
 393         setvar_envvar(dmake_job_msg);
 394 
 395         /*
 396          * Copying dosys()...
 397          * Stat .make.state to see if we'll need to reread it later
 398          */
 399         make_state->stat.time = file_no_time;
 400         (void)exists(make_state);
 401         make_state_before = make_state->stat.time;
 402 
 403         // Wait for the first Acknowledge message from the rxm process
 404         // before sending the first message.
 405         if (!firstAcknowledgeReceived) {
 406                 firstAcknowledgeReceived++;
 407                 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg();
 408                 if (msg) {
 409                         delete msg;
 410                 }
 411         }
 412 
 413         RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg;
 414         sigset_t newset;
 415         sigset_t oldset;
 416 
 417         AVO_BLOCK_INTERUPTS;
 418         int xdrResult = xdr(&xdrs_out, doJobMsg);
 419 
 420         if (xdrResult) {
 421                 fflush(dmake_ofp);
 422                 AVO_UNBLOCK_INTERUPTS;
 423         } else {
 424                 AVO_UNBLOCK_INTERUPTS;
 425                 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm"));
 426         }
 427 
 428         delete dmake_job_msg;
 429 }
 430 
 431 // Queue for JobResult messages.
 432 static RWSlistCollectablesQueue jobResultQueue;
 433 
 434 // Queue for Acknowledge messages.
 435 static RWSlistCollectablesQueue acknowledgeQueue;
 436 
 437 // Read a message from the rxm process, and put it into a queue, by
 438 // message type. Return the message type.
 439 
 440 int
 441 getRxmMessage(void)
 442 {
 443         RWCollectable   *msg = (RWCollectable *)0;
 444         int             msgType = 0;
 445 //      sigset_t        newset;
 446 //      sigset_t        oldset;
 447 
 448         // It seems unnecessarily to block interrupts here because
 449         // any nonignored signal means exit for dmake in distributed mode.
 450 //      AVO_BLOCK_INTERUPTS;
 451         int xdrResult = xdr(&xdrs_in, msg);
 452 //      AVO_UNBLOCK_INTERUPTS;
 453 
 454         if (xdrResult) {
 455                 switch(msg->isA()) {
 456                 case __AVO_ACKNOWLEDGEMSG:
 457                         acknowledgeQueue.append(msg);
 458                         msgType = __AVO_ACKNOWLEDGEMSG;
 459                         break;
 460                 case __AVO_JOBRESULTMSG:
 461                         jobResultQueue.append(msg);
 462                         msgType = __AVO_JOBRESULTMSG;
 463                         break;
 464                 default:
 465                         warning(catgets(catd, 1, 291, "Unknown message on rxm input fd"));
 466                         msgType = 0;
 467                         break;
 468                 }
 469         } else {
 470                 if (errno == EINTR) {
 471                         fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr);
 472                 }
 473                 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm"));
 474         }
 475 
 476         return msgType;
 477 }
 478 
 479 // Get a JobResult message from it's queue, and
 480 // if the queue is empty, call the getRxmMessage() function until
 481 // a JobResult message shows.
 482 
 483 Avo_JobResultMsg *
 484 getJobResultMsg(void)
 485 {
 486         RWCollectable *msg = 0;
 487 
 488         if (!(msg = jobResultQueue.get())) {
 489                 while (getRxmMessage() != __AVO_JOBRESULTMSG);
 490                 msg = jobResultQueue.get();
 491         }
 492 
 493         return (Avo_JobResultMsg *)msg;
 494 }
 495 
 496 // Get an Acknowledge message from it's queue, and
 497 // if the queue is empty, call the getRxmMessage() function until
 498 // a Acknowledge message shows.
 499 
 500 Avo_AcknowledgeMsg *
 501 getAcknowledgeMsg(void)
 502 {
 503         RWCollectable *msg = 0;
 504 
 505         if (!(msg = acknowledgeQueue.get())) {
 506                 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG);
 507                 msg = acknowledgeQueue.get();
 508         }
 509 
 510         return (Avo_AcknowledgeMsg *)msg;
 511 }
 512 
 513 
 514 /*
 515  * Doname
 516  * await_dist(Boolean waitflg)
 517  *
 518  * Waits for distributed children to exit and finishes their processing.
 519  * Rxm will send a msg down the pipe when a child is done.
 520  *
 521  */
 522 Doname
 523 await_dist(Boolean waitflg)
 524 {
 525         Avo_JobResultMsg        *dmake_result_msg;
 526         int                     job_msg_id;
 527         Doname                  result = build_ok;
 528         int                     result_msg_cmd_status;
 529         int                     result_msg_job_status;
 530         Running                 rp;
 531 
 532         while (!(dmake_result_msg = getJobResultMsg()));
 533         job_msg_id = dmake_result_msg->getId();
 534         result_msg_cmd_status = dmake_result_msg->getCmdStatus();
 535         result_msg_job_status = dmake_result_msg->getJobStatus();
 536 
 537         if (waitflg) {
 538                 result = (result_msg_cmd_status == 0) ? build_ok : build_failed;
 539 
 540 #ifdef PRINT_EXIT_STATUS
 541                 if (result == build_ok) {
 542                         warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok."));
 543                 } else {
 544                         warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed."));
 545                 }
 546 #endif
 547 
 548         } else {
 549                 for (rp = running_list;
 550                      (rp != NULL) && (job_msg_id != rp->job_msg_id);
 551                      rp = rp->next) {
 552                 }
 553                 if (rp == NULL) {
 554                         fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list"));
 555                 } else {
 556                         /* XXX - This may not be correct! */
 557                         if (result_msg_job_status == RETURNED) {
 558                                 rp->state = build_ok;
 559                         } else {
 560                                 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed;
 561                         }
 562                         result = rp->state;
 563 
 564 #ifdef PRINT_EXIT_STATUS
 565                         if (result == build_ok) {
 566                                 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok."));
 567                         } else {
 568                                 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed."));
 569                         }
 570 #endif
 571 
 572                 }
 573                 parallel_process_cnt--;
 574         }
 575         delete dmake_result_msg;
 576         return result;
 577 }
 578 
 579 #endif
 580 
 581