1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 /* 26 * @(#)dist.cc 1.35 06/12/12 27 */ 28 29 #pragma ident "@(#)dist.cc 1.25 96/03/12" 30 31 #ifdef DISTRIBUTED 32 /* 33 * dist.cc 34 * 35 * Deal with the distributed processing 36 */ 37 38 #include <avo/err.h> 39 #include <avo/find_dir.h> 40 #include <avo/util.h> 41 #include <dm/Avo_AcknowledgeMsg.h> 42 #include <dm/Avo_DoJobMsg.h> 43 #include <dm/Avo_JobResultMsg.h> 44 #include <mk/defs.h> 45 #include <mksh/misc.h> /* getmem() */ 46 #include <rw/pstream.h> 47 #include <rw/queuecol.h> 48 #include <rw/xdrstrea.h> 49 #include <signal.h> 50 #ifdef linux 51 #include <sstream> 52 using namespace std; 53 #else 54 #include <strstream.h> 55 #endif 56 #include <sys/stat.h> /* stat() */ 57 #include <sys/types.h> 58 #include <sys/wait.h> 59 #include <unistd.h> 60 #include <errno.h> 61 62 /* 63 * Defined macros 64 */ 65 66 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \ 67 sigprocmask(SIG_SETMASK, &newset, &oldset) 68 69 #define AVO_UNBLOCK_INTERUPTS \ 70 sigprocmask(SIG_SETMASK, &oldset, &newset) 71 72 73 /* 74 * typedefs & structs 75 */ 76 77 /* 78 * Static variables 79 */ 80 int dmake_ifd; 81 FILE* dmake_ifp; 82 XDR xdrs_in; 83 84 int dmake_ofd; 85 FILE* dmake_ofp; 86 XDR xdrs_out; 87 88 // These instances are required for the RWfactory. 89 static Avo_JobResultMsg dummyJobResultMsg; 90 static Avo_AcknowledgeMsg dummyAcknowledgeMsg; 91 static int firstAcknowledgeReceived = 0; 92 93 int rxmPid = 0; 94 95 /* 96 * File table of contents 97 */ 98 static void set_dmake_env_vars(void); 99 100 /* 101 * void 102 * startup_rxm(void) 103 * 104 * When startup_rxm() is called, read_command_options() and 105 * read_files_and_state() have already been read, so DMake 106 * will now know what options to pass to rxm. 107 * 108 * rxm 109 * [ -k ] [ -n ] 110 * [ -c <dmake_rcfile> ] 111 * [ -g <dmake_group> ] 112 * [ -j <dmake_max_jobs> ] 113 * [ -m <dmake_mode> ] 114 * [ -o <dmake_odir> ] 115 * <read_fd> <write_fd> 116 * 117 * rxm will, among other things, read the rc file. 118 * 119 */ 120 void 121 startup_rxm(void) 122 { 123 Name dmake_name; 124 Name dmake_value; 125 Avo_err *find_run_dir_err; 126 int pipe1[2], pipe2[2]; 127 Property prop; 128 char *run_dir; 129 char rxm_command[MAXPATHLEN]; 130 int rxm_debug = 0; 131 132 int length; 133 char * env; 134 135 firstAcknowledgeReceived = 0; 136 /* 137 * Create two pipes, one for dmake->rxm, and one for rxm->dmake. 138 * pipe1 is dmake->rxm, 139 * pipe2 is rxm->dmake. 140 */ 141 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) { 142 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno)); 143 } 144 145 set_dmake_env_vars(); 146 147 if ((rxmPid = fork()) < 0) { /* error */ 148 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno)); 149 } else if (rxmPid > 0) { /* parent, dmake */ 150 dmake_ofd = pipe1[1]; // write side of pipe 151 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) { 152 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno)); 153 } 154 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE); 155 156 dmake_ifd = pipe2[0]; // read side of pipe 157 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) { 158 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno)); 159 } 160 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE); 161 162 close(pipe1[0]); // read side 163 close(pipe2[1]); // write side 164 } else { /* child, rxm */ 165 close(pipe1[1]); // write side 166 close(pipe2[0]); // read side 167 168 /* Find the run directory of dmake, for rxm. */ 169 find_run_dir_err = avo_find_run_dir(&run_dir); 170 if (find_run_dir_err) { 171 delete find_run_dir_err; 172 /* Use the path to find rxm. */ 173 (void) sprintf(rxm_command, NOCATGETS("rxm")); 174 } else { 175 /* Use the run dir of dmake for rxm. */ 176 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir); 177 } 178 179 if (continue_after_error) { 180 (void) strcat(rxm_command, NOCATGETS(" -k")); 181 } 182 if (do_not_exec_rule) { 183 (void) strcat(rxm_command, NOCATGETS(" -n")); 184 } 185 if (rxm_debug) { 186 (void) strcat(rxm_command, NOCATGETS(" -S")); 187 } 188 if (send_mtool_msgs) { 189 (void) sprintf(&rxm_command[strlen(rxm_command)], 190 NOCATGETS(" -O %d"), 191 mtool_msgs_fd); 192 } 193 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE")); 194 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 195 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 196 ((dmake_value = prop->body.macro.value) != NULL)) { 197 (void) sprintf(&rxm_command[strlen(rxm_command)], 198 NOCATGETS(" -c %s"), 199 dmake_value->string_mb); 200 } else { 201 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE")); 202 env = getmem(length); 203 (void) sprintf(env, 204 "%s=", 205 NOCATGETS("DMAKE_RCFILE")); 206 (void) putenv(env); 207 } 208 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP")); 209 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 210 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 211 ((dmake_value = prop->body.macro.value) != NULL)) { 212 (void) sprintf(&rxm_command[strlen(rxm_command)], 213 NOCATGETS(" -g %s"), 214 dmake_value->string_mb); 215 } else { 216 length = 2 + strlen(NOCATGETS("DMAKE_GROUP")); 217 env = getmem(length); 218 (void) sprintf(env, 219 "%s=", 220 NOCATGETS("DMAKE_GROUP")); 221 (void) putenv(env); 222 } 223 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS")); 224 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 225 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 226 ((dmake_value = prop->body.macro.value) != NULL)) { 227 (void) sprintf(&rxm_command[strlen(rxm_command)], 228 NOCATGETS(" -j %s"), 229 dmake_value->string_mb); 230 } else { 231 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS")); 232 env = getmem(length); 233 (void) sprintf(env, 234 "%s=", 235 NOCATGETS("DMAKE_MAX_JOBS")); 236 (void) putenv(env); 237 } 238 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE")); 239 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 240 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 241 ((dmake_value = prop->body.macro.value) != NULL)) { 242 (void) sprintf(&rxm_command[strlen(rxm_command)], 243 NOCATGETS(" -m %s"), 244 dmake_value->string_mb); 245 } else { 246 length = 2 + strlen(NOCATGETS("DMAKE_MODE")); 247 env = getmem(length); 248 (void) sprintf(env, 249 "%s=", 250 NOCATGETS("DMAKE_MODE")); 251 (void) putenv(env); 252 } 253 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR")); 254 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 255 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 256 ((dmake_value = prop->body.macro.value) != NULL)) { 257 (void) sprintf(&rxm_command[strlen(rxm_command)], 258 NOCATGETS(" -o %s"), 259 dmake_value->string_mb); 260 } else { 261 length = 2 + strlen(NOCATGETS("DMAKE_ODIR")); 262 env = getmem(length); 263 (void) sprintf(env, 264 "%s=", 265 NOCATGETS("DMAKE_ODIR")); 266 (void) putenv(env); 267 } 268 269 (void) sprintf(&rxm_command[strlen(rxm_command)], 270 NOCATGETS(" %d %d"), 271 pipe1[0], pipe2[1]); 272 #ifdef linux 273 execl(NOCATGETS("/bin/sh"), 274 #else 275 execl(NOCATGETS("/usr/bin/sh"), 276 #endif 277 NOCATGETS("sh"), 278 NOCATGETS("-c"), 279 rxm_command, 280 (char *)NULL); 281 _exit(127); 282 } 283 } 284 285 /* 286 * static void 287 * set_dmake_env_vars() 288 * 289 * Sets the DMAKE_* environment variables for rxm and rxs. 290 * DMAKE_PWD 291 * DMAKE_NPWD 292 * DMAKE_UMASK 293 * DMAKE_SHELL 294 */ 295 static void 296 set_dmake_env_vars() 297 { 298 char *current_netpath; 299 char *current_path; 300 static char *env; 301 int length; 302 char netpath[MAXPATHLEN]; 303 mode_t um; 304 char um_buf[MAXPATHLEN]; 305 Name dmake_name; 306 Name dmake_value; 307 Property prop; 308 309 #ifdef REDIRECT_ERR 310 /* Set __DMAKE_REDIRECT_STDERR */ 311 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1; 312 env = getmem(length); 313 (void) sprintf(env, 314 "%s=%s", 315 NOCATGETS("__DMAKE_REDIRECT_STDERR"), 316 out_err_same ? NOCATGETS("0") : NOCATGETS("1")); 317 (void) putenv(env); 318 #endif 319 320 /* Set DMAKE_PWD to the current working directory */ 321 current_path = get_current_path(); 322 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path); 323 env = getmem(length); 324 (void) sprintf(env, 325 "%s=%s", 326 NOCATGETS("DMAKE_PWD"), 327 current_path); 328 (void) putenv(env); 329 330 /* Set DMAKE_NPWD to machine:pathname */ 331 if (avo_path_to_netpath(current_path, netpath)) { 332 current_netpath = netpath; 333 } else { 334 current_netpath = current_path; 335 } 336 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath); 337 env = getmem(length); 338 (void) sprintf(env, 339 "%s=%s", 340 NOCATGETS("DMAKE_NPWD"), 341 current_netpath); 342 (void) putenv(env); 343 344 /* Set DMAKE_UMASK to the value of umask */ 345 um = umask(0); 346 umask(um); 347 (void) sprintf(um_buf, NOCATGETS("%ul"), um); 348 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf); 349 env = getmem(length); 350 (void) sprintf(env, 351 "%s=%s", 352 NOCATGETS("DMAKE_UMASK"), 353 um_buf); 354 (void) putenv(env); 355 356 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) && 357 ((dmake_value = prop->body.macro.value) != NULL)) { 358 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) + 359 strlen(dmake_value->string_mb); 360 env = getmem(length); 361 (void) sprintf(env, 362 "%s=%s", 363 NOCATGETS("DMAKE_SHELL"), 364 dmake_value->string_mb); 365 (void) putenv(env); 366 } 367 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE")); 368 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 369 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 370 ((dmake_value = prop->body.macro.value) != NULL)) { 371 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) + 372 strlen(dmake_value->string_mb); 373 env = getmem(length); 374 (void) sprintf(env, 375 "%s=%s", 376 NOCATGETS("DMAKE_OUTPUT_MODE"), 377 dmake_value->string_mb); 378 (void) putenv(env); 379 } 380 } 381 382 /* 383 * void 384 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 385 * 386 * Write the DMake rule to be distributed down the pipe to rxm. 387 * 388 */ 389 void 390 distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 391 { 392 /* Add all dynamic env vars to the dmake_job_msg. */ 393 setvar_envvar(dmake_job_msg); 394 395 /* 396 * Copying dosys()... 397 * Stat .make.state to see if we'll need to reread it later 398 */ 399 make_state->stat.time = file_no_time; 400 (void)exists(make_state); 401 make_state_before = make_state->stat.time; 402 403 // Wait for the first Acknowledge message from the rxm process 404 // before sending the first message. 405 if (!firstAcknowledgeReceived) { 406 firstAcknowledgeReceived++; 407 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg(); 408 if (msg) { 409 delete msg; 410 } 411 } 412 413 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg; 414 sigset_t newset; 415 sigset_t oldset; 416 417 AVO_BLOCK_INTERUPTS; 418 int xdrResult = xdr(&xdrs_out, doJobMsg); 419 420 if (xdrResult) { 421 fflush(dmake_ofp); 422 AVO_UNBLOCK_INTERUPTS; 423 } else { 424 AVO_UNBLOCK_INTERUPTS; 425 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm")); 426 } 427 428 delete dmake_job_msg; 429 } 430 431 // Queue for JobResult messages. 432 static RWSlistCollectablesQueue jobResultQueue; 433 434 // Queue for Acknowledge messages. 435 static RWSlistCollectablesQueue acknowledgeQueue; 436 437 // Read a message from the rxm process, and put it into a queue, by 438 // message type. Return the message type. 439 440 int 441 getRxmMessage(void) 442 { 443 RWCollectable *msg = (RWCollectable *)0; 444 int msgType = 0; 445 // sigset_t newset; 446 // sigset_t oldset; 447 448 // It seems unnecessarily to block interrupts here because 449 // any nonignored signal means exit for dmake in distributed mode. 450 // AVO_BLOCK_INTERUPTS; 451 int xdrResult = xdr(&xdrs_in, msg); 452 // AVO_UNBLOCK_INTERUPTS; 453 454 if (xdrResult) { 455 switch(msg->isA()) { 456 case __AVO_ACKNOWLEDGEMSG: 457 acknowledgeQueue.append(msg); 458 msgType = __AVO_ACKNOWLEDGEMSG; 459 break; 460 case __AVO_JOBRESULTMSG: 461 jobResultQueue.append(msg); 462 msgType = __AVO_JOBRESULTMSG; 463 break; 464 default: 465 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd")); 466 msgType = 0; 467 break; 468 } 469 } else { 470 if (errno == EINTR) { 471 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr); 472 } 473 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm")); 474 } 475 476 return msgType; 477 } 478 479 // Get a JobResult message from it's queue, and 480 // if the queue is empty, call the getRxmMessage() function until 481 // a JobResult message shows. 482 483 Avo_JobResultMsg * 484 getJobResultMsg(void) 485 { 486 RWCollectable *msg = 0; 487 488 if (!(msg = jobResultQueue.get())) { 489 while (getRxmMessage() != __AVO_JOBRESULTMSG); 490 msg = jobResultQueue.get(); 491 } 492 493 return (Avo_JobResultMsg *)msg; 494 } 495 496 // Get an Acknowledge message from it's queue, and 497 // if the queue is empty, call the getRxmMessage() function until 498 // a Acknowledge message shows. 499 500 Avo_AcknowledgeMsg * 501 getAcknowledgeMsg(void) 502 { 503 RWCollectable *msg = 0; 504 505 if (!(msg = acknowledgeQueue.get())) { 506 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG); 507 msg = acknowledgeQueue.get(); 508 } 509 510 return (Avo_AcknowledgeMsg *)msg; 511 } 512 513 514 /* 515 * Doname 516 * await_dist(Boolean waitflg) 517 * 518 * Waits for distributed children to exit and finishes their processing. 519 * Rxm will send a msg down the pipe when a child is done. 520 * 521 */ 522 Doname 523 await_dist(Boolean waitflg) 524 { 525 Avo_JobResultMsg *dmake_result_msg; 526 int job_msg_id; 527 Doname result = build_ok; 528 int result_msg_cmd_status; 529 int result_msg_job_status; 530 Running rp; 531 532 while (!(dmake_result_msg = getJobResultMsg())); 533 job_msg_id = dmake_result_msg->getId(); 534 result_msg_cmd_status = dmake_result_msg->getCmdStatus(); 535 result_msg_job_status = dmake_result_msg->getJobStatus(); 536 537 if (waitflg) { 538 result = (result_msg_cmd_status == 0) ? build_ok : build_failed; 539 540 #ifdef PRINT_EXIT_STATUS 541 if (result == build_ok) { 542 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok.")); 543 } else { 544 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed.")); 545 } 546 #endif 547 548 } else { 549 for (rp = running_list; 550 (rp != NULL) && (job_msg_id != rp->job_msg_id); 551 rp = rp->next) { 552 } 553 if (rp == NULL) { 554 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list")); 555 } else { 556 /* XXX - This may not be correct! */ 557 if (result_msg_job_status == RETURNED) { 558 rp->state = build_ok; 559 } else { 560 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed; 561 } 562 result = rp->state; 563 564 #ifdef PRINT_EXIT_STATUS 565 if (result == build_ok) { 566 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok.")); 567 } else { 568 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed.")); 569 } 570 #endif 571 572 } 573 parallel_process_cnt--; 574 } 575 delete dmake_result_msg; 576 return result; 577 } 578 579 #endif 580 581