1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25 /*
26 * @(#)dist.cc 1.35 06/12/12
27 */
28
29 #pragma ident "@(#)dist.cc 1.25 96/03/12"
30
31 #ifdef DISTRIBUTED
32 /*
33 * dist.cc
34 *
35 * Deal with the distributed processing
36 */
37
38 #include <avo/err.h>
39 #include <avo/find_dir.h>
40 #include <avo/util.h>
41 #include <dm/Avo_AcknowledgeMsg.h>
42 #include <dm/Avo_DoJobMsg.h>
43 #include <dm/Avo_JobResultMsg.h>
44 #include <mk/defs.h>
45 #include <mksh/misc.h> /* getmem() */
46 #include <rw/pstream.h>
47 #include <rw/queuecol.h>
48 #include <rw/xdrstrea.h>
49 #include <signal.h>
50 #ifdef linux
51 #include <sstream>
52 using namespace std;
53 #else
54 #include <strstream.h>
55 #endif
56 #include <sys/stat.h> /* stat() */
57 #include <sys/types.h>
58 #include <sys/wait.h>
59 #include <unistd.h>
60 #include <errno.h>
61
62 /*
63 * Defined macros
64 */
65
66 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \
67 sigprocmask(SIG_SETMASK, &newset, &oldset)
68
69 #define AVO_UNBLOCK_INTERUPTS \
70 sigprocmask(SIG_SETMASK, &oldset, &newset)
71
72
73 /*
74 * typedefs & structs
75 */
76
77 /*
78 * Static variables
79 */
80 int dmake_ifd;
81 FILE* dmake_ifp;
82 XDR xdrs_in;
83
84 int dmake_ofd;
85 FILE* dmake_ofp;
86 XDR xdrs_out;
87
88 // These instances are required for the RWfactory.
89 static Avo_JobResultMsg dummyJobResultMsg;
90 static Avo_AcknowledgeMsg dummyAcknowledgeMsg;
91 static int firstAcknowledgeReceived = 0;
92
93 int rxmPid = 0;
94
95 /*
96 * File table of contents
97 */
98 static void set_dmake_env_vars(void);
99
100 /*
101 * void
102 * startup_rxm(void)
103 *
104 * When startup_rxm() is called, read_command_options() and
105 * read_files_and_state() have already been read, so DMake
106 * will now know what options to pass to rxm.
107 *
108 * rxm
109 * [ -k ] [ -n ]
110 * [ -c <dmake_rcfile> ]
111 * [ -g <dmake_group> ]
112 * [ -j <dmake_max_jobs> ]
113 * [ -m <dmake_mode> ]
114 * [ -o <dmake_odir> ]
115 * <read_fd> <write_fd>
116 *
117 * rxm will, among other things, read the rc file.
118 *
119 */
120 void
121 startup_rxm(void)
122 {
123 Name dmake_name;
124 Name dmake_value;
125 Avo_err *find_run_dir_err;
126 int pipe1[2], pipe2[2];
127 Property prop;
128 char *run_dir;
129 char rxm_command[MAXPATHLEN];
130 int rxm_debug = 0;
131
132 int length;
133 char * env;
134
135 firstAcknowledgeReceived = 0;
136 /*
137 * Create two pipes, one for dmake->rxm, and one for rxm->dmake.
138 * pipe1 is dmake->rxm,
139 * pipe2 is rxm->dmake.
140 */
141 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) {
142 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno));
143 }
144
145 set_dmake_env_vars();
146
147 if ((rxmPid = fork()) < 0) { /* error */
148 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno));
149 } else if (rxmPid > 0) { /* parent, dmake */
150 dmake_ofd = pipe1[1]; // write side of pipe
151 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) {
152 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno));
153 }
154 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE);
155
156 dmake_ifd = pipe2[0]; // read side of pipe
157 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) {
158 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno));
159 }
160 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE);
161
162 close(pipe1[0]); // read side
163 close(pipe2[1]); // write side
164 } else { /* child, rxm */
165 close(pipe1[1]); // write side
166 close(pipe2[0]); // read side
167
168 /* Find the run directory of dmake, for rxm. */
169 find_run_dir_err = avo_find_run_dir(&run_dir);
170 if (find_run_dir_err) {
171 delete find_run_dir_err;
172 /* Use the path to find rxm. */
173 (void) sprintf(rxm_command, NOCATGETS("rxm"));
174 } else {
175 /* Use the run dir of dmake for rxm. */
176 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir);
177 }
178
179 if (continue_after_error) {
180 (void) strcat(rxm_command, NOCATGETS(" -k"));
181 }
182 if (do_not_exec_rule) {
183 (void) strcat(rxm_command, NOCATGETS(" -n"));
184 }
185 if (rxm_debug) {
186 (void) strcat(rxm_command, NOCATGETS(" -S"));
187 }
188 if (send_mtool_msgs) {
189 (void) sprintf(&rxm_command[strlen(rxm_command)],
190 NOCATGETS(" -O %d"),
191 mtool_msgs_fd);
192 }
193 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE"));
194 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
195 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
196 ((dmake_value = prop->body.macro.value) != NULL)) {
197 (void) sprintf(&rxm_command[strlen(rxm_command)],
198 NOCATGETS(" -c %s"),
199 dmake_value->string_mb);
200 } else {
201 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE"));
202 env = getmem(length);
203 (void) sprintf(env,
204 "%s=",
205 NOCATGETS("DMAKE_RCFILE"));
206 (void) putenv(env);
207 }
208 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP"));
209 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
210 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
211 ((dmake_value = prop->body.macro.value) != NULL)) {
212 (void) sprintf(&rxm_command[strlen(rxm_command)],
213 NOCATGETS(" -g %s"),
214 dmake_value->string_mb);
215 } else {
216 length = 2 + strlen(NOCATGETS("DMAKE_GROUP"));
217 env = getmem(length);
218 (void) sprintf(env,
219 "%s=",
220 NOCATGETS("DMAKE_GROUP"));
221 (void) putenv(env);
222 }
223 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS"));
224 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
225 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
226 ((dmake_value = prop->body.macro.value) != NULL)) {
227 (void) sprintf(&rxm_command[strlen(rxm_command)],
228 NOCATGETS(" -j %s"),
229 dmake_value->string_mb);
230 } else {
231 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS"));
232 env = getmem(length);
233 (void) sprintf(env,
234 "%s=",
235 NOCATGETS("DMAKE_MAX_JOBS"));
236 (void) putenv(env);
237 }
238 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE"));
239 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
240 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
241 ((dmake_value = prop->body.macro.value) != NULL)) {
242 (void) sprintf(&rxm_command[strlen(rxm_command)],
243 NOCATGETS(" -m %s"),
244 dmake_value->string_mb);
245 } else {
246 length = 2 + strlen(NOCATGETS("DMAKE_MODE"));
247 env = getmem(length);
248 (void) sprintf(env,
249 "%s=",
250 NOCATGETS("DMAKE_MODE"));
251 (void) putenv(env);
252 }
253 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR"));
254 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
255 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
256 ((dmake_value = prop->body.macro.value) != NULL)) {
257 (void) sprintf(&rxm_command[strlen(rxm_command)],
258 NOCATGETS(" -o %s"),
259 dmake_value->string_mb);
260 } else {
261 length = 2 + strlen(NOCATGETS("DMAKE_ODIR"));
262 env = getmem(length);
263 (void) sprintf(env,
264 "%s=",
265 NOCATGETS("DMAKE_ODIR"));
266 (void) putenv(env);
267 }
268
269 (void) sprintf(&rxm_command[strlen(rxm_command)],
270 NOCATGETS(" %d %d"),
271 pipe1[0], pipe2[1]);
272 #ifdef linux
273 execl(NOCATGETS("/bin/sh"),
274 #else
275 execl(NOCATGETS("/usr/bin/sh"),
276 #endif
277 NOCATGETS("sh"),
278 NOCATGETS("-c"),
279 rxm_command,
280 (char *)NULL);
281 _exit(127);
282 }
283 }
284
285 /*
286 * static void
287 * set_dmake_env_vars()
288 *
289 * Sets the DMAKE_* environment variables for rxm and rxs.
290 * DMAKE_PWD
291 * DMAKE_NPWD
292 * DMAKE_UMASK
293 * DMAKE_SHELL
294 */
295 static void
296 set_dmake_env_vars()
297 {
298 char *current_netpath;
299 char *current_path;
300 static char *env;
301 int length;
302 char netpath[MAXPATHLEN];
303 mode_t um;
304 char um_buf[MAXPATHLEN];
305 Name dmake_name;
306 Name dmake_value;
307 Property prop;
308
309 #ifdef REDIRECT_ERR
310 /* Set __DMAKE_REDIRECT_STDERR */
311 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1;
312 env = getmem(length);
313 (void) sprintf(env,
314 "%s=%s",
315 NOCATGETS("__DMAKE_REDIRECT_STDERR"),
316 out_err_same ? NOCATGETS("0") : NOCATGETS("1"));
317 (void) putenv(env);
318 #endif
319
320 /* Set DMAKE_PWD to the current working directory */
321 current_path = get_current_path();
322 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path);
323 env = getmem(length);
324 (void) sprintf(env,
325 "%s=%s",
326 NOCATGETS("DMAKE_PWD"),
327 current_path);
328 (void) putenv(env);
329
330 /* Set DMAKE_NPWD to machine:pathname */
331 if (avo_path_to_netpath(current_path, netpath)) {
332 current_netpath = netpath;
333 } else {
334 current_netpath = current_path;
335 }
336 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath);
337 env = getmem(length);
338 (void) sprintf(env,
339 "%s=%s",
340 NOCATGETS("DMAKE_NPWD"),
341 current_netpath);
342 (void) putenv(env);
343
344 /* Set DMAKE_UMASK to the value of umask */
345 um = umask(0);
346 umask(um);
347 (void) sprintf(um_buf, NOCATGETS("%ul"), um);
348 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf);
349 env = getmem(length);
350 (void) sprintf(env,
351 "%s=%s",
352 NOCATGETS("DMAKE_UMASK"),
353 um_buf);
354 (void) putenv(env);
355
356 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) &&
357 ((dmake_value = prop->body.macro.value) != NULL)) {
358 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) +
359 strlen(dmake_value->string_mb);
360 env = getmem(length);
361 (void) sprintf(env,
362 "%s=%s",
363 NOCATGETS("DMAKE_SHELL"),
364 dmake_value->string_mb);
365 (void) putenv(env);
366 }
367 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE"));
368 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
369 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
370 ((dmake_value = prop->body.macro.value) != NULL)) {
371 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) +
372 strlen(dmake_value->string_mb);
373 env = getmem(length);
374 (void) sprintf(env,
375 "%s=%s",
376 NOCATGETS("DMAKE_OUTPUT_MODE"),
377 dmake_value->string_mb);
378 (void) putenv(env);
379 }
380 }
381
382 /*
383 * void
384 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
385 *
386 * Write the DMake rule to be distributed down the pipe to rxm.
387 *
388 */
389 void
390 distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
391 {
392 /* Add all dynamic env vars to the dmake_job_msg. */
393 setvar_envvar(dmake_job_msg);
394
395 /*
396 * Copying dosys()...
397 * Stat .make.state to see if we'll need to reread it later
398 */
399 make_state->stat.time = file_no_time;
400 (void)exists(make_state);
401 make_state_before = make_state->stat.time;
402
403 // Wait for the first Acknowledge message from the rxm process
404 // before sending the first message.
405 if (!firstAcknowledgeReceived) {
406 firstAcknowledgeReceived++;
407 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg();
408 if (msg) {
409 delete msg;
410 }
411 }
412
413 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg;
414 sigset_t newset;
415 sigset_t oldset;
416
417 AVO_BLOCK_INTERUPTS;
418 int xdrResult = xdr(&xdrs_out, doJobMsg);
419
420 if (xdrResult) {
421 fflush(dmake_ofp);
422 AVO_UNBLOCK_INTERUPTS;
423 } else {
424 AVO_UNBLOCK_INTERUPTS;
425 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm"));
426 }
427
428 delete dmake_job_msg;
429 }
430
431 // Queue for JobResult messages.
432 static RWSlistCollectablesQueue jobResultQueue;
433
434 // Queue for Acknowledge messages.
435 static RWSlistCollectablesQueue acknowledgeQueue;
436
437 // Read a message from the rxm process, and put it into a queue, by
438 // message type. Return the message type.
439
440 int
441 getRxmMessage(void)
442 {
443 RWCollectable *msg = (RWCollectable *)0;
444 int msgType = 0;
445 // sigset_t newset;
446 // sigset_t oldset;
447
448 // It seems unnecessarily to block interrupts here because
449 // any nonignored signal means exit for dmake in distributed mode.
450 // AVO_BLOCK_INTERUPTS;
451 int xdrResult = xdr(&xdrs_in, msg);
452 // AVO_UNBLOCK_INTERUPTS;
453
454 if (xdrResult) {
455 switch(msg->isA()) {
456 case __AVO_ACKNOWLEDGEMSG:
457 acknowledgeQueue.append(msg);
458 msgType = __AVO_ACKNOWLEDGEMSG;
459 break;
460 case __AVO_JOBRESULTMSG:
461 jobResultQueue.append(msg);
462 msgType = __AVO_JOBRESULTMSG;
463 break;
464 default:
465 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd"));
466 msgType = 0;
467 break;
468 }
469 } else {
470 if (errno == EINTR) {
471 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr);
472 }
473 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm"));
474 }
475
476 return msgType;
477 }
478
479 // Get a JobResult message from it's queue, and
480 // if the queue is empty, call the getRxmMessage() function until
481 // a JobResult message shows.
482
483 Avo_JobResultMsg *
484 getJobResultMsg(void)
485 {
486 RWCollectable *msg = 0;
487
488 if (!(msg = jobResultQueue.get())) {
489 while (getRxmMessage() != __AVO_JOBRESULTMSG);
490 msg = jobResultQueue.get();
491 }
492
493 return (Avo_JobResultMsg *)msg;
494 }
495
496 // Get an Acknowledge message from it's queue, and
497 // if the queue is empty, call the getRxmMessage() function until
498 // a Acknowledge message shows.
499
500 Avo_AcknowledgeMsg *
501 getAcknowledgeMsg(void)
502 {
503 RWCollectable *msg = 0;
504
505 if (!(msg = acknowledgeQueue.get())) {
506 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG);
507 msg = acknowledgeQueue.get();
508 }
509
510 return (Avo_AcknowledgeMsg *)msg;
511 }
512
513
514 /*
515 * Doname
516 * await_dist(Boolean waitflg)
517 *
518 * Waits for distributed children to exit and finishes their processing.
519 * Rxm will send a msg down the pipe when a child is done.
520 *
521 */
522 Doname
523 await_dist(Boolean waitflg)
524 {
525 Avo_JobResultMsg *dmake_result_msg;
526 int job_msg_id;
527 Doname result = build_ok;
528 int result_msg_cmd_status;
529 int result_msg_job_status;
530 Running rp;
531
532 while (!(dmake_result_msg = getJobResultMsg()));
533 job_msg_id = dmake_result_msg->getId();
534 result_msg_cmd_status = dmake_result_msg->getCmdStatus();
535 result_msg_job_status = dmake_result_msg->getJobStatus();
536
537 if (waitflg) {
538 result = (result_msg_cmd_status == 0) ? build_ok : build_failed;
539
540 #ifdef PRINT_EXIT_STATUS
541 if (result == build_ok) {
542 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok."));
543 } else {
544 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed."));
545 }
546 #endif
547
548 } else {
549 for (rp = running_list;
550 (rp != NULL) && (job_msg_id != rp->job_msg_id);
551 rp = rp->next) {
552 }
553 if (rp == NULL) {
554 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list"));
555 } else {
556 /* XXX - This may not be correct! */
557 if (result_msg_job_status == RETURNED) {
558 rp->state = build_ok;
559 } else {
560 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed;
561 }
562 result = rp->state;
563
564 #ifdef PRINT_EXIT_STATUS
565 if (result == build_ok) {
566 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok."));
567 } else {
568 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed."));
569 }
570 #endif
571
572 }
573 parallel_process_cnt--;
574 }
575 delete dmake_result_msg;
576 return result;
577 }
578
579 #endif
580
581