1 /* 2 * CDDL HEADER START 3 * 4 * The contents of this file are subject to the terms of the 5 * Common Development and Distribution License (the "License"). 6 * You may not use this file except in compliance with the License. 7 * 8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE 9 * or http://www.opensolaris.org/os/licensing. 10 * See the License for the specific language governing permissions 11 * and limitations under the License. 12 * 13 * When distributing Covered Code, include this CDDL HEADER in each 14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE. 15 * If applicable, add the following below this CDDL HEADER, with the 16 * fields enclosed by brackets "[]" replaced with your own identifying 17 * information: Portions Copyright [yyyy] [name of copyright owner] 18 * 19 * CDDL HEADER END 20 */ 21 /* 22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved. 23 * Use is subject to license terms. 24 */ 25 26 #ifdef DISTRIBUTED 27 /* 28 * dist.cc 29 * 30 * Deal with the distributed processing 31 */ 32 33 #include <avo/err.h> 34 #include <avo/find_dir.h> 35 #include <avo/util.h> 36 #include <dm/Avo_AcknowledgeMsg.h> 37 #include <dm/Avo_DoJobMsg.h> 38 #include <dm/Avo_JobResultMsg.h> 39 #include <mk/defs.h> 40 #include <mksh/misc.h> /* getmem() */ 41 #include <rw/pstream.h> 42 #include <rw/queuecol.h> 43 #include <rw/xdrstrea.h> 44 #include <signal.h> 45 #include <strstream.h> 46 #include <sys/stat.h> /* stat() */ 47 #include <sys/types.h> 48 #include <sys/wait.h> 49 #include <unistd.h> 50 #include <errno.h> 51 52 /* 53 * Defined macros 54 */ 55 56 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \ 57 sigprocmask(SIG_SETMASK, &newset, &oldset) 58 59 #define AVO_UNBLOCK_INTERUPTS \ 60 sigprocmask(SIG_SETMASK, &oldset, &newset) 61 62 63 /* 64 * typedefs & structs 65 */ 66 67 /* 68 * Static variables 69 */ 70 int dmake_ifd; 71 FILE* dmake_ifp; 72 XDR xdrs_in; 73 74 int dmake_ofd; 75 FILE* dmake_ofp; 76 XDR xdrs_out; 77 78 // These instances are required for the RWfactory. 79 static Avo_JobResultMsg dummyJobResultMsg; 80 static Avo_AcknowledgeMsg dummyAcknowledgeMsg; 81 static int firstAcknowledgeReceived = 0; 82 83 int rxmPid = 0; 84 85 /* 86 * File table of contents 87 */ 88 static void set_dmake_env_vars(void); 89 90 /* 91 * void 92 * startup_rxm(void) 93 * 94 * When startup_rxm() is called, read_command_options() and 95 * read_files_and_state() have already been read, so DMake 96 * will now know what options to pass to rxm. 97 * 98 * rxm 99 * [ -k ] [ -n ] 100 * [ -c <dmake_rcfile> ] 101 * [ -g <dmake_group> ] 102 * [ -j <dmake_max_jobs> ] 103 * [ -m <dmake_mode> ] 104 * [ -o <dmake_odir> ] 105 * <read_fd> <write_fd> 106 * 107 * rxm will, among other things, read the rc file. 108 * 109 */ 110 void 111 startup_rxm(void) 112 { 113 Name dmake_name; 114 Name dmake_value; 115 Avo_err *find_run_dir_err; 116 int pipe1[2], pipe2[2]; 117 Property prop; 118 char *run_dir; 119 char rxm_command[MAXPATHLEN]; 120 int rxm_debug = 0; 121 122 int length; 123 char * env; 124 125 firstAcknowledgeReceived = 0; 126 /* 127 * Create two pipes, one for dmake->rxm, and one for rxm->dmake. 128 * pipe1 is dmake->rxm, 129 * pipe2 is rxm->dmake. 130 */ 131 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) { 132 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno)); 133 } 134 135 set_dmake_env_vars(); 136 137 if ((rxmPid = fork()) < 0) { /* error */ 138 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno)); 139 } else if (rxmPid > 0) { /* parent, dmake */ 140 dmake_ofd = pipe1[1]; // write side of pipe 141 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) { 142 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno)); 143 } 144 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE); 145 146 dmake_ifd = pipe2[0]; // read side of pipe 147 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) { 148 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno)); 149 } 150 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE); 151 152 close(pipe1[0]); // read side 153 close(pipe2[1]); // write side 154 } else { /* child, rxm */ 155 close(pipe1[1]); // write side 156 close(pipe2[0]); // read side 157 158 /* Find the run directory of dmake, for rxm. */ 159 find_run_dir_err = avo_find_run_dir(&run_dir); 160 if (find_run_dir_err) { 161 delete find_run_dir_err; 162 /* Use the path to find rxm. */ 163 (void) sprintf(rxm_command, NOCATGETS("rxm")); 164 } else { 165 /* Use the run dir of dmake for rxm. */ 166 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir); 167 } 168 169 if (continue_after_error) { 170 (void) strcat(rxm_command, NOCATGETS(" -k")); 171 } 172 if (do_not_exec_rule) { 173 (void) strcat(rxm_command, NOCATGETS(" -n")); 174 } 175 if (rxm_debug) { 176 (void) strcat(rxm_command, NOCATGETS(" -S")); 177 } 178 if (send_mtool_msgs) { 179 (void) sprintf(&rxm_command[strlen(rxm_command)], 180 NOCATGETS(" -O %d"), 181 mtool_msgs_fd); 182 } 183 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE")); 184 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 185 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 186 ((dmake_value = prop->body.macro.value) != NULL)) { 187 (void) sprintf(&rxm_command[strlen(rxm_command)], 188 NOCATGETS(" -c %s"), 189 dmake_value->string_mb); 190 } else { 191 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE")); 192 env = getmem(length); 193 (void) sprintf(env, 194 "%s=", 195 NOCATGETS("DMAKE_RCFILE")); 196 (void) putenv(env); 197 } 198 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP")); 199 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 200 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 201 ((dmake_value = prop->body.macro.value) != NULL)) { 202 (void) sprintf(&rxm_command[strlen(rxm_command)], 203 NOCATGETS(" -g %s"), 204 dmake_value->string_mb); 205 } else { 206 length = 2 + strlen(NOCATGETS("DMAKE_GROUP")); 207 env = getmem(length); 208 (void) sprintf(env, 209 "%s=", 210 NOCATGETS("DMAKE_GROUP")); 211 (void) putenv(env); 212 } 213 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS")); 214 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 215 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 216 ((dmake_value = prop->body.macro.value) != NULL)) { 217 (void) sprintf(&rxm_command[strlen(rxm_command)], 218 NOCATGETS(" -j %s"), 219 dmake_value->string_mb); 220 } else { 221 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS")); 222 env = getmem(length); 223 (void) sprintf(env, 224 "%s=", 225 NOCATGETS("DMAKE_MAX_JOBS")); 226 (void) putenv(env); 227 } 228 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE")); 229 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 230 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 231 ((dmake_value = prop->body.macro.value) != NULL)) { 232 (void) sprintf(&rxm_command[strlen(rxm_command)], 233 NOCATGETS(" -m %s"), 234 dmake_value->string_mb); 235 } else { 236 length = 2 + strlen(NOCATGETS("DMAKE_MODE")); 237 env = getmem(length); 238 (void) sprintf(env, 239 "%s=", 240 NOCATGETS("DMAKE_MODE")); 241 (void) putenv(env); 242 } 243 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR")); 244 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 245 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 246 ((dmake_value = prop->body.macro.value) != NULL)) { 247 (void) sprintf(&rxm_command[strlen(rxm_command)], 248 NOCATGETS(" -o %s"), 249 dmake_value->string_mb); 250 } else { 251 length = 2 + strlen(NOCATGETS("DMAKE_ODIR")); 252 env = getmem(length); 253 (void) sprintf(env, 254 "%s=", 255 NOCATGETS("DMAKE_ODIR")); 256 (void) putenv(env); 257 } 258 259 (void) sprintf(&rxm_command[strlen(rxm_command)], 260 NOCATGETS(" %d %d"), 261 pipe1[0], pipe2[1]); 262 execl(NOCATGETS("/usr/bin/sh"), 263 NOCATGETS("sh"), 264 NOCATGETS("-c"), 265 rxm_command, 266 (char *)NULL); 267 _exit(127); 268 } 269 } 270 271 /* 272 * static void 273 * set_dmake_env_vars() 274 * 275 * Sets the DMAKE_* environment variables for rxm and rxs. 276 * DMAKE_PWD 277 * DMAKE_NPWD 278 * DMAKE_UMASK 279 * DMAKE_SHELL 280 */ 281 static void 282 set_dmake_env_vars() 283 { 284 char *current_netpath; 285 char *current_path; 286 static char *env; 287 int length; 288 char netpath[MAXPATHLEN]; 289 mode_t um; 290 char um_buf[MAXPATHLEN]; 291 Name dmake_name; 292 Name dmake_value; 293 Property prop; 294 295 /* Set __DMAKE_REDIRECT_STDERR */ 296 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1; 297 env = getmem(length); 298 (void) sprintf(env, 299 "%s=%s", 300 NOCATGETS("__DMAKE_REDIRECT_STDERR"), 301 out_err_same ? NOCATGETS("0") : NOCATGETS("1")); 302 (void) putenv(env); 303 304 /* Set DMAKE_PWD to the current working directory */ 305 current_path = get_current_path(); 306 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path); 307 env = getmem(length); 308 (void) sprintf(env, 309 "%s=%s", 310 NOCATGETS("DMAKE_PWD"), 311 current_path); 312 (void) putenv(env); 313 314 /* Set DMAKE_NPWD to machine:pathname */ 315 if (avo_path_to_netpath(current_path, netpath)) { 316 current_netpath = netpath; 317 } else { 318 current_netpath = current_path; 319 } 320 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath); 321 env = getmem(length); 322 (void) sprintf(env, 323 "%s=%s", 324 NOCATGETS("DMAKE_NPWD"), 325 current_netpath); 326 (void) putenv(env); 327 328 /* Set DMAKE_UMASK to the value of umask */ 329 um = umask(0); 330 umask(um); 331 (void) sprintf(um_buf, NOCATGETS("%ul"), um); 332 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf); 333 env = getmem(length); 334 (void) sprintf(env, 335 "%s=%s", 336 NOCATGETS("DMAKE_UMASK"), 337 um_buf); 338 (void) putenv(env); 339 340 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) && 341 ((dmake_value = prop->body.macro.value) != NULL)) { 342 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) + 343 strlen(dmake_value->string_mb); 344 env = getmem(length); 345 (void) sprintf(env, 346 "%s=%s", 347 NOCATGETS("DMAKE_SHELL"), 348 dmake_value->string_mb); 349 (void) putenv(env); 350 } 351 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE")); 352 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH); 353 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) && 354 ((dmake_value = prop->body.macro.value) != NULL)) { 355 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) + 356 strlen(dmake_value->string_mb); 357 env = getmem(length); 358 (void) sprintf(env, 359 "%s=%s", 360 NOCATGETS("DMAKE_OUTPUT_MODE"), 361 dmake_value->string_mb); 362 (void) putenv(env); 363 } 364 } 365 366 /* 367 * void 368 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 369 * 370 * Write the DMake rule to be distributed down the pipe to rxm. 371 * 372 */ 373 void 374 distribute_rxm(Avo_DoJobMsg *dmake_job_msg) 375 { 376 /* Add all dynamic env vars to the dmake_job_msg. */ 377 setvar_envvar(dmake_job_msg); 378 379 /* 380 * Copying dosys()... 381 * Stat .make.state to see if we'll need to reread it later 382 */ 383 make_state->stat.time = file_no_time; 384 (void)exists(make_state); 385 make_state_before = make_state->stat.time; 386 387 // Wait for the first Acknowledge message from the rxm process 388 // before sending the first message. 389 if (!firstAcknowledgeReceived) { 390 firstAcknowledgeReceived++; 391 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg(); 392 if (msg) { 393 delete msg; 394 } 395 } 396 397 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg; 398 sigset_t newset; 399 sigset_t oldset; 400 401 AVO_BLOCK_INTERUPTS; 402 int xdrResult = xdr(&xdrs_out, doJobMsg); 403 404 if (xdrResult) { 405 fflush(dmake_ofp); 406 AVO_UNBLOCK_INTERUPTS; 407 } else { 408 AVO_UNBLOCK_INTERUPTS; 409 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm")); 410 } 411 412 delete dmake_job_msg; 413 } 414 415 // Queue for JobResult messages. 416 static RWSlistCollectablesQueue jobResultQueue; 417 418 // Queue for Acknowledge messages. 419 static RWSlistCollectablesQueue acknowledgeQueue; 420 421 // Read a message from the rxm process, and put it into a queue, by 422 // message type. Return the message type. 423 424 int 425 getRxmMessage(void) 426 { 427 RWCollectable *msg = (RWCollectable *)0; 428 int msgType = 0; 429 // sigset_t newset; 430 // sigset_t oldset; 431 432 // It seems unnecessarily to block interrupts here because 433 // any nonignored signal means exit for dmake in distributed mode. 434 // AVO_BLOCK_INTERUPTS; 435 int xdrResult = xdr(&xdrs_in, msg); 436 // AVO_UNBLOCK_INTERUPTS; 437 438 if (xdrResult) { 439 switch(msg->isA()) { 440 case __AVO_ACKNOWLEDGEMSG: 441 acknowledgeQueue.append(msg); 442 msgType = __AVO_ACKNOWLEDGEMSG; 443 break; 444 case __AVO_JOBRESULTMSG: 445 jobResultQueue.append(msg); 446 msgType = __AVO_JOBRESULTMSG; 447 break; 448 default: 449 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd")); 450 msgType = 0; 451 break; 452 } 453 } else { 454 if (errno == EINTR) { 455 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr); 456 } 457 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm")); 458 } 459 460 return msgType; 461 } 462 463 // Get a JobResult message from it's queue, and 464 // if the queue is empty, call the getRxmMessage() function until 465 // a JobResult message shows. 466 467 Avo_JobResultMsg * 468 getJobResultMsg(void) 469 { 470 RWCollectable *msg = 0; 471 472 if (!(msg = jobResultQueue.get())) { 473 while (getRxmMessage() != __AVO_JOBRESULTMSG); 474 msg = jobResultQueue.get(); 475 } 476 477 return (Avo_JobResultMsg *)msg; 478 } 479 480 // Get an Acknowledge message from it's queue, and 481 // if the queue is empty, call the getRxmMessage() function until 482 // a Acknowledge message shows. 483 484 Avo_AcknowledgeMsg * 485 getAcknowledgeMsg(void) 486 { 487 RWCollectable *msg = 0; 488 489 if (!(msg = acknowledgeQueue.get())) { 490 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG); 491 msg = acknowledgeQueue.get(); 492 } 493 494 return (Avo_AcknowledgeMsg *)msg; 495 } 496 497 498 /* 499 * Doname 500 * await_dist(Boolean waitflg) 501 * 502 * Waits for distributed children to exit and finishes their processing. 503 * Rxm will send a msg down the pipe when a child is done. 504 * 505 */ 506 Doname 507 await_dist(Boolean waitflg) 508 { 509 Avo_JobResultMsg *dmake_result_msg; 510 int job_msg_id; 511 Doname result = build_ok; 512 int result_msg_cmd_status; 513 int result_msg_job_status; 514 Running rp; 515 516 while (!(dmake_result_msg = getJobResultMsg())); 517 job_msg_id = dmake_result_msg->getId(); 518 result_msg_cmd_status = dmake_result_msg->getCmdStatus(); 519 result_msg_job_status = dmake_result_msg->getJobStatus(); 520 521 if (waitflg) { 522 result = (result_msg_cmd_status == 0) ? build_ok : build_failed; 523 524 #ifdef PRINT_EXIT_STATUS 525 if (result == build_ok) { 526 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok.")); 527 } else { 528 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed.")); 529 } 530 #endif 531 532 } else { 533 for (rp = running_list; 534 (rp != NULL) && (job_msg_id != rp->job_msg_id); 535 rp = rp->next) { 536 } 537 if (rp == NULL) { 538 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list")); 539 } else { 540 /* XXX - This may not be correct! */ 541 if (result_msg_job_status == RETURNED) { 542 rp->state = build_ok; 543 } else { 544 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed; 545 } 546 result = rp->state; 547 548 #ifdef PRINT_EXIT_STATUS 549 if (result == build_ok) { 550 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok.")); 551 } else { 552 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed.")); 553 } 554 #endif 555 556 } 557 parallel_process_cnt--; 558 } 559 delete dmake_result_msg; 560 return result; 561 } 562 563 #endif 564 565