1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #ifdef DISTRIBUTED 27 /* 28 * dist.cc 29 * 30 * Deal with the distributed processing 31 */ 32 33 #include <avo/err.h> 34 #include <avo/find_dir.h> 35 #include <avo/util.h> 36 #include <dm/Avo_AcknowledgeMsg.h> 37 #include <dm/Avo_DoJobMsg.h> 38 #include <dm/Avo_JobResultMsg.h> 39 #include <mk/defs.h> 40 #include <mksh/misc.h> /* getmem() */ 41 #include <rw/pstream.h> 42 #include <rw/queuecol.h> 43 #include <rw/xdrstrea.h> 44 #include <signal.h> 45 #include <strstream.h> 46 #include <sys/stat.h> /* stat() */ 47 #include <sys/types.h> 48 #include <sys/wait.h> 49 #include <unistd.h> 50 #include <errno.h> 51 52 /* 53 * Defined macros 54 */ 55 56 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \ 57 sigprocmask(SIG_SETMASK, &newset, &oldset) 58 59 #define AVO_UNBLOCK_INTERUPTS \ 60 sigprocmask(SIG_SETMASK, &oldset, &newset) 61 62 63 /* 64 * typedefs & structs 65 */ 66 67 /* 68 * Static variables 69 */ 70 int dmake_ifd; 71 FILE* dmake_ifp; 72 XDR xdrs_in; 73 74 int dmake_ofd; 75 FILE* dmake_ofp; 76 XDR xdrs_out; 77 78 // These instances are required for the RWfactory. 79 static Avo_JobResultMsg dummyJobResultMsg; 80 static Avo_AcknowledgeMsg dummyAcknowledgeMsg; 81 static int firstAcknowledgeReceived = 0; 82 83 int rxmPid = 0; 84 85 /* 86 * File table of contents 87 */ 88 static void set_dmake_env_vars(void); 89 90 /* 91 * void 92 * startup_rxm(void) 93 * 94 * When startup_rxm() is called, read_command_options() and 95 * read_files_and_state() have already been read, so DMake 96 * will now know what options to pass to rxm. 97 * 98 * rxm 99 * [ -k ] [ -n ] 100 * [ -c <dmake_rcfile> ] 101 * [ -g <dmake_group> ] 102 * [ -j <dmake_max_jobs> ] 103 * [ -m <dmake_mode> ] 104 * [ -o <dmake_odir> ] 105 * <read_fd> <write_fd> 106 * 107 * rxm will, among other things, read the rc file. 108 * 109 */ 110 void 111 startup_rxm(void) 112 { 113 Name dmake_name; 114 Name dmake_value; 115 Avo_err *find_run_dir_err; 116 int pipe1[2], pipe2[2]; 117 Property prop; 118 char *run_dir; 119 char rxm_command[MAXPATHLEN]; 120 int rxm_debug = 0; 121 122 int length; 123 char * env; 124 125 firstAcknowledgeReceived = 0; 126 /* 127 * Create two pipes, one for dmake->rxm, and one for rxm->dmake. 128 * pipe1 is dmake->rxm, 129 * pipe2 is rxm->dmake. 130 */ 131 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) { 132 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno)); 133 } 134 135 set_dmake_env_vars(); 136 137 if ((rxmPid = fork()) < 0) { /* error */ 138 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno)); 139 } else if (rxmPid > 0) { /* parent, dmake */ 140 dmake_ofd = pipe1[1]; // write side of pipe 141 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) { 142 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno)); 143 } 144 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE); 145 146 dmake_ifd = pipe2[0]; // read side of pipe 147 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) { 148 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno)); 149 } 150 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE); 151 152 close(pipe1[0]); // read side 153 close(pipe2[1]); // write side 154 } else { /* child, rxm */ 155 close(pipe1[1]); // write side 156 close(pipe2[0]); // read side 157 158 /* Find the run directory of dmake, for rxm. */ 159 find_run_dir_err = avo_find_run_dir(&run_dir); 160 if (find_run_dir_err) { 161 delete find_run_dir_err; 162 /* Use the path to find rxm. */ 163 (void) sprintf(rxm_command, NOCATGETS("rxm")); 164 } else { 165 /* Use the run dir of dmake for rxm. */ 166 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir); 167 } 168 169 if (continue_after_error) { 170 (void) strcat(rxm_command, NOCATGETS(" -k")); 171 } 172 if (do_not_exec_rule) { 173 (void) strcat(rxm_command, NOCATGETS(" -n")); 174 } 175 if (rxm_debug) { 176 (void) strcat(rxm_command, NOCATGETS(" -S")); 177 } 178 if (send_mtool_msgs) { 179 (void) sprintf(&rxm_command[strlen(rxm_command)], 180 NOCATGETS(" -O %d"), 181 mtool_msgs_fd); 182 } 183 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE")); 184 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 185 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 186 ((dmake_value = prop->body.macro.value) != NULL)) { 187 (void) sprintf(&rxm_command[strlen(rxm_command)], 188 NOCATGETS(" -c %s"), 189 dmake_value->string_mb); 190 } else { 191 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE")); 192 env = getmem(length); 193 (void) sprintf(env, 194 "%s=", 195 NOCATGETS("DMAKE_RCFILE")); 196 (void) putenv(env); 197 } 198 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP")); 199 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 200 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 201 ((dmake_value = prop->body.macro.value) != NULL)) { 202 (void) sprintf(&rxm_command[strlen(rxm_command)], 203 NOCATGETS(" -g %s"), 204 dmake_value->string_mb); 205 } else { 206 length = 2 + strlen(NOCATGETS("DMAKE_GROUP")); 207 env = getmem(length); 208 (void) sprintf(env, 209 "%s=", 210 NOCATGETS("DMAKE_GROUP")); 211 (void) putenv(env); 212 } 213 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS")); 214 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 215 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 216 ((dmake_value = prop->body.macro.value) != NULL)) { 217 (void) sprintf(&rxm_command[strlen(rxm_command)], 218 NOCATGETS(" -j %s"), 219 dmake_value->string_mb); 220 } else { 221 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS")); 222 env = getmem(length); 223 (void) sprintf(env, 224 "%s=", 225 NOCATGETS("DMAKE_MAX_JOBS")); 226 (void) putenv(env); 227 } 228 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE")); 229 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 230 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 231 ((dmake_value = prop->body.macro.value) != NULL)) { 232 (void) sprintf(&rxm_command[strlen(rxm_command)], 233 NOCATGETS(" -m %s"), 234 dmake_value->string_mb); 235 } else { 236 length = 2 + strlen(NOCATGETS("DMAKE_MODE")); 237 env = getmem(length); 238 (void) sprintf(env, 239 "%s=", 240 NOCATGETS("DMAKE_MODE")); 241 (void) putenv(env); 242 } 243 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR")); 244 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 245 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 246 ((dmake_value = prop->body.macro.value) != NULL)) { 247 (void) sprintf(&rxm_command[strlen(rxm_command)], 248 NOCATGETS(" -o %s"), 249 dmake_value->string_mb); 250 } else { 251 length = 2 + strlen(NOCATGETS("DMAKE_ODIR")); 252 env = getmem(length); 253 (void) sprintf(env, 254 "%s=", 255 NOCATGETS("DMAKE_ODIR")); 256 (void) putenv(env); 257 } 258 259 (void) sprintf(&rxm_command[strlen(rxm_command)], 260 NOCATGETS(" %d %d"), 261 pipe1[0], pipe2[1]); 262 execl(NOCATGETS("/usr/bin/sh"), 263 NOCATGETS("sh"), 264 NOCATGETS("-c"), 265 rxm_command, 266 (char *)NULL); 267 _exit(127); 268 } 269 } 270 271 /* 272 * static void 273 * set_dmake_env_vars() 274 * 275 * Sets the DMAKE_* environment variables for rxm and rxs. 276 * DMAKE_PWD 277 * DMAKE_NPWD 278 * DMAKE_UMASK 279 * DMAKE_SHELL 280 */ 281 static void 282 set_dmake_env_vars() 283 { 284 char *current_netpath; 285 char *current_path; 286 static char *env; 287 int length; 288 char netpath[MAXPATHLEN]; 289 mode_t um; 290 char um_buf[MAXPATHLEN]; 291 Name dmake_name; 292 Name dmake_value; 293 Property prop; 294 295 #ifdef REDIRECT_ERR 296 /* Set __DMAKE_REDIRECT_STDERR */ 297 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1; 298 env = getmem(length); 299 (void) sprintf(env, 300 "%s=%s", 301 NOCATGETS("__DMAKE_REDIRECT_STDERR"), 302 out_err_same ? NOCATGETS("0") : NOCATGETS("1")); 303 (void) putenv(env); 304 #endif 305 306 /* Set DMAKE_PWD to the current working directory */ 307 current_path = get_current_path(); 308 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path); 309 env = getmem(length); 310 (void) sprintf(env, 311 "%s=%s", 312 NOCATGETS("DMAKE_PWD"), 313 current_path); 314 (void) putenv(env); 315 316 /* Set DMAKE_NPWD to machine:pathname */ 317 if (avo_path_to_netpath(current_path, netpath)) { 318 current_netpath = netpath; 319 } else { 320 current_netpath = current_path; 321 } 322 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath); 323 env = getmem(length); 324 (void) sprintf(env, 325 "%s=%s", 326 NOCATGETS("DMAKE_NPWD"), 327 current_netpath); 328 (void) putenv(env); 329 330 /* Set DMAKE_UMASK to the value of umask */ 331 um = umask(0); 332 umask(um); 333 (void) sprintf(um_buf, NOCATGETS("%ul"), um); 334 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf); 335 env = getmem(length); 336 (void) sprintf(env, 337 "%s=%s", 338 NOCATGETS("DMAKE_UMASK"), 339 um_buf); 340 (void) putenv(env); 341 342 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) && 343 ((dmake_value = prop->body.macro.value) != NULL)) { 344 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) + 345 strlen(dmake_value->string_mb); 346 env = getmem(length); 347 (void) sprintf(env, 348 "%s=%s", 349 NOCATGETS("DMAKE_SHELL"), 350 dmake_value->string_mb); 351 (void) putenv(env); 352 } 353 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE")); 354 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 355 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 356 ((dmake_value = prop->body.macro.value) != NULL)) { 357 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) + 358 strlen(dmake_value->string_mb); 359 env = getmem(length); 360 (void) sprintf(env, 361 "%s=%s", 362 NOCATGETS("DMAKE_OUTPUT_MODE"), 363 dmake_value->string_mb); 364 (void) putenv(env); 365 } 366 } 367 368 /* 369 * void 370 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 371 * 372 * Write the DMake rule to be distributed down the pipe to rxm. 373 * 374 */ 375 void 376 distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 377 { 378 /* Add all dynamic env vars to the dmake_job_msg. */ 379 setvar_envvar(dmake_job_msg); 380 381 /* 382 * Copying dosys()... 383 * Stat .make.state to see if we'll need to reread it later 384 */ 385 make_state->stat.time = file_no_time; 386 (void)exists(make_state); 387 make_state_before = make_state->stat.time; 388 389 // Wait for the first Acknowledge message from the rxm process 390 // before sending the first message. 391 if (!firstAcknowledgeReceived) { 392 firstAcknowledgeReceived++; 393 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg(); 394 if (msg) { 395 delete msg; 396 } 397 } 398 399 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg; 400 sigset_t newset; 401 sigset_t oldset; 402 403 AVO_BLOCK_INTERUPTS; 404 int xdrResult = xdr(&xdrs_out, doJobMsg); 405 406 if (xdrResult) { 407 fflush(dmake_ofp); 408 AVO_UNBLOCK_INTERUPTS; 409 } else { 410 AVO_UNBLOCK_INTERUPTS; 411 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm")); 412 } 413 414 delete dmake_job_msg; 415 } 416 417 // Queue for JobResult messages. 418 static RWSlistCollectablesQueue jobResultQueue; 419 420 // Queue for Acknowledge messages. 421 static RWSlistCollectablesQueue acknowledgeQueue; 422 423 // Read a message from the rxm process, and put it into a queue, by 424 // message type. Return the message type. 425 426 int 427 getRxmMessage(void) 428 { 429 RWCollectable *msg = (RWCollectable *)0; 430 int msgType = 0; 431 // sigset_t newset; 432 // sigset_t oldset; 433 434 // It seems unnecessarily to block interrupts here because 435 // any nonignored signal means exit for dmake in distributed mode. 436 // AVO_BLOCK_INTERUPTS; 437 int xdrResult = xdr(&xdrs_in, msg); 438 // AVO_UNBLOCK_INTERUPTS; 439 440 if (xdrResult) { 441 switch(msg->isA()) { 442 case __AVO_ACKNOWLEDGEMSG: 443 acknowledgeQueue.append(msg); 444 msgType = __AVO_ACKNOWLEDGEMSG; 445 break; 446 case __AVO_JOBRESULTMSG: 447 jobResultQueue.append(msg); 448 msgType = __AVO_JOBRESULTMSG; 449 break; 450 default: 451 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd")); 452 msgType = 0; 453 break; 454 } 455 } else { 456 if (errno == EINTR) { 457 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr); 458 } 459 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm")); 460 } 461 462 return msgType; 463 } 464 465 // Get a JobResult message from it's queue, and 466 // if the queue is empty, call the getRxmMessage() function until 467 // a JobResult message shows. 468 469 Avo_JobResultMsg * 470 getJobResultMsg(void) 471 { 472 RWCollectable *msg = 0; 473 474 if (!(msg = jobResultQueue.get())) { 475 while (getRxmMessage() != __AVO_JOBRESULTMSG); 476 msg = jobResultQueue.get(); 477 } 478 479 return (Avo_JobResultMsg *)msg; 480 } 481 482 // Get an Acknowledge message from it's queue, and 483 // if the queue is empty, call the getRxmMessage() function until 484 // a Acknowledge message shows. 485 486 Avo_AcknowledgeMsg * 487 getAcknowledgeMsg(void) 488 { 489 RWCollectable *msg = 0; 490 491 if (!(msg = acknowledgeQueue.get())) { 492 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG); 493 msg = acknowledgeQueue.get(); 494 } 495 496 return (Avo_AcknowledgeMsg *)msg; 497 } 498 499 500 /* 501 * Doname 502 * await_dist(Boolean waitflg) 503 * 504 * Waits for distributed children to exit and finishes their processing. 505 * Rxm will send a msg down the pipe when a child is done. 506 * 507 */ 508 Doname 509 await_dist(Boolean waitflg) 510 { 511 Avo_JobResultMsg *dmake_result_msg; 512 int job_msg_id; 513 Doname result = build_ok; 514 int result_msg_cmd_status; 515 int result_msg_job_status; 516 Running rp; 517 518 while (!(dmake_result_msg = getJobResultMsg())); 519 job_msg_id = dmake_result_msg->getId(); 520 result_msg_cmd_status = dmake_result_msg->getCmdStatus(); 521 result_msg_job_status = dmake_result_msg->getJobStatus(); 522 523 if (waitflg) { 524 result = (result_msg_cmd_status == 0) ? build_ok : build_failed; 525 526 #ifdef PRINT_EXIT_STATUS 527 if (result == build_ok) { 528 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok.")); 529 } else { 530 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed.")); 531 } 532 #endif 533 534 } else { 535 for (rp = running_list; 536 (rp != NULL) && (job_msg_id != rp->job_msg_id); 537 rp = rp->next) { 538 } 539 if (rp == NULL) { 540 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list")); 541 } else { 542 /* XXX - This may not be correct! */ 543 if (result_msg_job_status == RETURNED) { 544 rp->state = build_ok; 545 } else { 546 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed; 547 } 548 result = rp->state; 549 550 #ifdef PRINT_EXIT_STATUS 551 if (result == build_ok) { 552 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok.")); 553 } else { 554 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed.")); 555 } 556 #endif 557 558 } 559 parallel_process_cnt--; 560 } 561 delete dmake_result_msg; 562 return result; 563 } 564 565 #endif 566 567