1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #ifdef DISTRIBUTED 27 /* 28 * dist.cc 29 * 30 * Deal with the distributed processing 31 */ 32 33 #include <avo/err.h> 34 #include <avo/find_dir.h> 35 #include <avo/util.h> 36 #include <dm/Avo_AcknowledgeMsg.h> 37 #include <dm/Avo_DoJobMsg.h> 38 #include <dm/Avo_JobResultMsg.h> 39 #include <mk/defs.h> 40 #include <mksh/misc.h> /* getmem() */ 41 #include <rw/pstream.h> 42 #include <rw/queuecol.h> 43 #include <rw/xdrstrea.h> 44 #include <signal.h> 45 #ifdef linux 46 #include <sstream> 47 using namespace std; 48 #else 49 #include <strstream.h> 50 #endif 51 #include <sys/stat.h> /* stat() */ 52 #include <sys/types.h> 53 #include <sys/wait.h> 54 #include <unistd.h> 55 #include <errno.h> 56 57 /* 58 * Defined macros 59 */ 60 61 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \ 62 sigprocmask(SIG_SETMASK, &newset, &oldset) 63 64 #define AVO_UNBLOCK_INTERUPTS \ 65 sigprocmask(SIG_SETMASK, &oldset, &newset) 66 67 68 /* 69 * typedefs & structs 70 */ 71 72 /* 73 * Static variables 74 */ 75 int dmake_ifd; 76 FILE* dmake_ifp; 77 XDR xdrs_in; 78 79 int dmake_ofd; 80 FILE* dmake_ofp; 81 XDR xdrs_out; 82 83 // These instances are required for the RWfactory. 84 static Avo_JobResultMsg dummyJobResultMsg; 85 static Avo_AcknowledgeMsg dummyAcknowledgeMsg; 86 static int firstAcknowledgeReceived = 0; 87 88 int rxmPid = 0; 89 90 /* 91 * File table of contents 92 */ 93 static void set_dmake_env_vars(void); 94 95 /* 96 * void 97 * startup_rxm(void) 98 * 99 * When startup_rxm() is called, read_command_options() and 100 * read_files_and_state() have already been read, so DMake 101 * will now know what options to pass to rxm. 102 * 103 * rxm 104 * [ -k ] [ -n ] 105 * [ -c <dmake_rcfile> ] 106 * [ -g <dmake_group> ] 107 * [ -j <dmake_max_jobs> ] 108 * [ -m <dmake_mode> ] 109 * [ -o <dmake_odir> ] 110 * <read_fd> <write_fd> 111 * 112 * rxm will, among other things, read the rc file. 113 * 114 */ 115 void 116 startup_rxm(void) 117 { 118 Name dmake_name; 119 Name dmake_value; 120 Avo_err *find_run_dir_err; 121 int pipe1[2], pipe2[2]; 122 Property prop; 123 char *run_dir; 124 char rxm_command[MAXPATHLEN]; 125 int rxm_debug = 0; 126 127 int length; 128 char * env; 129 130 firstAcknowledgeReceived = 0; 131 /* 132 * Create two pipes, one for dmake->rxm, and one for rxm->dmake. 133 * pipe1 is dmake->rxm, 134 * pipe2 is rxm->dmake. 135 */ 136 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) { 137 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno)); 138 } 139 140 set_dmake_env_vars(); 141 142 if ((rxmPid = fork()) < 0) { /* error */ 143 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno)); 144 } else if (rxmPid > 0) { /* parent, dmake */ 145 dmake_ofd = pipe1[1]; // write side of pipe 146 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) { 147 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno)); 148 } 149 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE); 150 151 dmake_ifd = pipe2[0]; // read side of pipe 152 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) { 153 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno)); 154 } 155 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE); 156 157 close(pipe1[0]); // read side 158 close(pipe2[1]); // write side 159 } else { /* child, rxm */ 160 close(pipe1[1]); // write side 161 close(pipe2[0]); // read side 162 163 /* Find the run directory of dmake, for rxm. */ 164 find_run_dir_err = avo_find_run_dir(&run_dir); 165 if (find_run_dir_err) { 166 delete find_run_dir_err; 167 /* Use the path to find rxm. */ 168 (void) sprintf(rxm_command, NOCATGETS("rxm")); 169 } else { 170 /* Use the run dir of dmake for rxm. */ 171 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir); 172 } 173 174 if (continue_after_error) { 175 (void) strcat(rxm_command, NOCATGETS(" -k")); 176 } 177 if (do_not_exec_rule) { 178 (void) strcat(rxm_command, NOCATGETS(" -n")); 179 } 180 if (rxm_debug) { 181 (void) strcat(rxm_command, NOCATGETS(" -S")); 182 } 183 if (send_mtool_msgs) { 184 (void) sprintf(&rxm_command[strlen(rxm_command)], 185 NOCATGETS(" -O %d"), 186 mtool_msgs_fd); 187 } 188 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE")); 189 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 190 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 191 ((dmake_value = prop->body.macro.value) != NULL)) { 192 (void) sprintf(&rxm_command[strlen(rxm_command)], 193 NOCATGETS(" -c %s"), 194 dmake_value->string_mb); 195 } else { 196 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE")); 197 env = getmem(length); 198 (void) sprintf(env, 199 "%s=", 200 NOCATGETS("DMAKE_RCFILE")); 201 (void) putenv(env); 202 } 203 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP")); 204 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 205 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 206 ((dmake_value = prop->body.macro.value) != NULL)) { 207 (void) sprintf(&rxm_command[strlen(rxm_command)], 208 NOCATGETS(" -g %s"), 209 dmake_value->string_mb); 210 } else { 211 length = 2 + strlen(NOCATGETS("DMAKE_GROUP")); 212 env = getmem(length); 213 (void) sprintf(env, 214 "%s=", 215 NOCATGETS("DMAKE_GROUP")); 216 (void) putenv(env); 217 } 218 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS")); 219 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 220 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 221 ((dmake_value = prop->body.macro.value) != NULL)) { 222 (void) sprintf(&rxm_command[strlen(rxm_command)], 223 NOCATGETS(" -j %s"), 224 dmake_value->string_mb); 225 } else { 226 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS")); 227 env = getmem(length); 228 (void) sprintf(env, 229 "%s=", 230 NOCATGETS("DMAKE_MAX_JOBS")); 231 (void) putenv(env); 232 } 233 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE")); 234 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 235 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 236 ((dmake_value = prop->body.macro.value) != NULL)) { 237 (void) sprintf(&rxm_command[strlen(rxm_command)], 238 NOCATGETS(" -m %s"), 239 dmake_value->string_mb); 240 } else { 241 length = 2 + strlen(NOCATGETS("DMAKE_MODE")); 242 env = getmem(length); 243 (void) sprintf(env, 244 "%s=", 245 NOCATGETS("DMAKE_MODE")); 246 (void) putenv(env); 247 } 248 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR")); 249 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 250 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 251 ((dmake_value = prop->body.macro.value) != NULL)) { 252 (void) sprintf(&rxm_command[strlen(rxm_command)], 253 NOCATGETS(" -o %s"), 254 dmake_value->string_mb); 255 } else { 256 length = 2 + strlen(NOCATGETS("DMAKE_ODIR")); 257 env = getmem(length); 258 (void) sprintf(env, 259 "%s=", 260 NOCATGETS("DMAKE_ODIR")); 261 (void) putenv(env); 262 } 263 264 (void) sprintf(&rxm_command[strlen(rxm_command)], 265 NOCATGETS(" %d %d"), 266 pipe1[0], pipe2[1]); 267 #ifdef linux 268 execl(NOCATGETS("/bin/sh"), 269 #else 270 execl(NOCATGETS("/usr/bin/sh"), 271 #endif 272 NOCATGETS("sh"), 273 NOCATGETS("-c"), 274 rxm_command, 275 (char *)NULL); 276 _exit(127); 277 } 278 } 279 280 /* 281 * static void 282 * set_dmake_env_vars() 283 * 284 * Sets the DMAKE_* environment variables for rxm and rxs. 285 * DMAKE_PWD 286 * DMAKE_NPWD 287 * DMAKE_UMASK 288 * DMAKE_SHELL 289 */ 290 static void 291 set_dmake_env_vars() 292 { 293 char *current_netpath; 294 char *current_path; 295 static char *env; 296 int length; 297 char netpath[MAXPATHLEN]; 298 mode_t um; 299 char um_buf[MAXPATHLEN]; 300 Name dmake_name; 301 Name dmake_value; 302 Property prop; 303 304 #ifdef REDIRECT_ERR 305 /* Set __DMAKE_REDIRECT_STDERR */ 306 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1; 307 env = getmem(length); 308 (void) sprintf(env, 309 "%s=%s", 310 NOCATGETS("__DMAKE_REDIRECT_STDERR"), 311 out_err_same ? NOCATGETS("0") : NOCATGETS("1")); 312 (void) putenv(env); 313 #endif 314 315 /* Set DMAKE_PWD to the current working directory */ 316 current_path = get_current_path(); 317 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path); 318 env = getmem(length); 319 (void) sprintf(env, 320 "%s=%s", 321 NOCATGETS("DMAKE_PWD"), 322 current_path); 323 (void) putenv(env); 324 325 /* Set DMAKE_NPWD to machine:pathname */ 326 if (avo_path_to_netpath(current_path, netpath)) { 327 current_netpath = netpath; 328 } else { 329 current_netpath = current_path; 330 } 331 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath); 332 env = getmem(length); 333 (void) sprintf(env, 334 "%s=%s", 335 NOCATGETS("DMAKE_NPWD"), 336 current_netpath); 337 (void) putenv(env); 338 339 /* Set DMAKE_UMASK to the value of umask */ 340 um = umask(0); 341 umask(um); 342 (void) sprintf(um_buf, NOCATGETS("%ul"), um); 343 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf); 344 env = getmem(length); 345 (void) sprintf(env, 346 "%s=%s", 347 NOCATGETS("DMAKE_UMASK"), 348 um_buf); 349 (void) putenv(env); 350 351 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) && 352 ((dmake_value = prop->body.macro.value) != NULL)) { 353 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) + 354 strlen(dmake_value->string_mb); 355 env = getmem(length); 356 (void) sprintf(env, 357 "%s=%s", 358 NOCATGETS("DMAKE_SHELL"), 359 dmake_value->string_mb); 360 (void) putenv(env); 361 } 362 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE")); 363 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 364 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 365 ((dmake_value = prop->body.macro.value) != NULL)) { 366 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) + 367 strlen(dmake_value->string_mb); 368 env = getmem(length); 369 (void) sprintf(env, 370 "%s=%s", 371 NOCATGETS("DMAKE_OUTPUT_MODE"), 372 dmake_value->string_mb); 373 (void) putenv(env); 374 } 375 } 376 377 /* 378 * void 379 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 380 * 381 * Write the DMake rule to be distributed down the pipe to rxm. 382 * 383 */ 384 void 385 distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 386 { 387 /* Add all dynamic env vars to the dmake_job_msg. */ 388 setvar_envvar(dmake_job_msg); 389 390 /* 391 * Copying dosys()... 392 * Stat .make.state to see if we'll need to reread it later 393 */ 394 make_state->stat.time = file_no_time; 395 (void)exists(make_state); 396 make_state_before = make_state->stat.time; 397 398 // Wait for the first Acknowledge message from the rxm process 399 // before sending the first message. 400 if (!firstAcknowledgeReceived) { 401 firstAcknowledgeReceived++; 402 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg(); 403 if (msg) { 404 delete msg; 405 } 406 } 407 408 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg; 409 sigset_t newset; 410 sigset_t oldset; 411 412 AVO_BLOCK_INTERUPTS; 413 int xdrResult = xdr(&xdrs_out, doJobMsg); 414 415 if (xdrResult) { 416 fflush(dmake_ofp); 417 AVO_UNBLOCK_INTERUPTS; 418 } else { 419 AVO_UNBLOCK_INTERUPTS; 420 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm")); 421 } 422 423 delete dmake_job_msg; 424 } 425 426 // Queue for JobResult messages. 427 static RWSlistCollectablesQueue jobResultQueue; 428 429 // Queue for Acknowledge messages. 430 static RWSlistCollectablesQueue acknowledgeQueue; 431 432 // Read a message from the rxm process, and put it into a queue, by 433 // message type. Return the message type. 434 435 int 436 getRxmMessage(void) 437 { 438 RWCollectable *msg = (RWCollectable *)0; 439 int msgType = 0; 440 // sigset_t newset; 441 // sigset_t oldset; 442 443 // It seems unnecessarily to block interrupts here because 444 // any nonignored signal means exit for dmake in distributed mode. 445 // AVO_BLOCK_INTERUPTS; 446 int xdrResult = xdr(&xdrs_in, msg); 447 // AVO_UNBLOCK_INTERUPTS; 448 449 if (xdrResult) { 450 switch(msg->isA()) { 451 case __AVO_ACKNOWLEDGEMSG: 452 acknowledgeQueue.append(msg); 453 msgType = __AVO_ACKNOWLEDGEMSG; 454 break; 455 case __AVO_JOBRESULTMSG: 456 jobResultQueue.append(msg); 457 msgType = __AVO_JOBRESULTMSG; 458 break; 459 default: 460 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd")); 461 msgType = 0; 462 break; 463 } 464 } else { 465 if (errno == EINTR) { 466 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr); 467 } 468 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm")); 469 } 470 471 return msgType; 472 } 473 474 // Get a JobResult message from it's queue, and 475 // if the queue is empty, call the getRxmMessage() function until 476 // a JobResult message shows. 477 478 Avo_JobResultMsg * 479 getJobResultMsg(void) 480 { 481 RWCollectable *msg = 0; 482 483 if (!(msg = jobResultQueue.get())) { 484 while (getRxmMessage() != __AVO_JOBRESULTMSG); 485 msg = jobResultQueue.get(); 486 } 487 488 return (Avo_JobResultMsg *)msg; 489 } 490 491 // Get an Acknowledge message from it's queue, and 492 // if the queue is empty, call the getRxmMessage() function until 493 // a Acknowledge message shows. 494 495 Avo_AcknowledgeMsg * 496 getAcknowledgeMsg(void) 497 { 498 RWCollectable *msg = 0; 499 500 if (!(msg = acknowledgeQueue.get())) { 501 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG); 502 msg = acknowledgeQueue.get(); 503 } 504 505 return (Avo_AcknowledgeMsg *)msg; 506 } 507 508 509 /* 510 * Doname 511 * await_dist(Boolean waitflg) 512 * 513 * Waits for distributed children to exit and finishes their processing. 514 * Rxm will send a msg down the pipe when a child is done. 515 * 516 */ 517 Doname 518 await_dist(Boolean waitflg) 519 { 520 Avo_JobResultMsg *dmake_result_msg; 521 int job_msg_id; 522 Doname result = build_ok; 523 int result_msg_cmd_status; 524 int result_msg_job_status; 525 Running rp; 526 527 while (!(dmake_result_msg = getJobResultMsg())); 528 job_msg_id = dmake_result_msg->getId(); 529 result_msg_cmd_status = dmake_result_msg->getCmdStatus(); 530 result_msg_job_status = dmake_result_msg->getJobStatus(); 531 532 if (waitflg) { 533 result = (result_msg_cmd_status == 0) ? build_ok : build_failed; 534 535 #ifdef PRINT_EXIT_STATUS 536 if (result == build_ok) { 537 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok.")); 538 } else { 539 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed.")); 540 } 541 #endif 542 543 } else { 544 for (rp = running_list; 545 (rp != NULL) && (job_msg_id != rp->job_msg_id); 546 rp = rp->next) { 547 } 548 if (rp == NULL) { 549 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list")); 550 } else { 551 /* XXX - This may not be correct! */ 552 if (result_msg_job_status == RETURNED) { 553 rp->state = build_ok; 554 } else { 555 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed; 556 } 557 result = rp->state; 558 559 #ifdef PRINT_EXIT_STATUS 560 if (result == build_ok) { 561 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok.")); 562 } else { 563 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed.")); 564 } 565 #endif 566 567 } 568 parallel_process_cnt--; 569 } 570 delete dmake_result_msg; 571 return result; 572 } 573 574 #endif 575 576