1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #ifdef DISTRIBUTED
27 /*
28 * dist.cc
29 *
30 * Deal with the distributed processing
31 */
32
33 #include <avo/err.h>
34 #include <avo/find_dir.h>
35 #include <avo/util.h>
36 #include <dm/Avo_AcknowledgeMsg.h>
37 #include <dm/Avo_DoJobMsg.h>
38 #include <dm/Avo_JobResultMsg.h>
39 #include <mk/defs.h>
40 #include <mksh/misc.h> /* getmem() */
41 #include <rw/pstream.h>
42 #include <rw/queuecol.h>
43 #include <rw/xdrstrea.h>
44 #include <signal.h>
45 #include <strstream.h>
46 #include <sys/stat.h> /* stat() */
47 #include <sys/types.h>
48 #include <sys/wait.h>
49 #include <unistd.h>
50 #include <errno.h>
51
52 /*
53 * Defined macros
54 */
55
56 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \
57 sigprocmask(SIG_SETMASK, &newset, &oldset)
58
59 #define AVO_UNBLOCK_INTERUPTS \
60 sigprocmask(SIG_SETMASK, &oldset, &newset)
61
62
63 /*
64 * typedefs & structs
65 */
66
67 /*
68 * Static variables
69 */
70 int dmake_ifd;
71 FILE* dmake_ifp;
72 XDR xdrs_in;
73
74 int dmake_ofd;
75 FILE* dmake_ofp;
76 XDR xdrs_out;
77
78 // These instances are required for the RWfactory.
79 static Avo_JobResultMsg dummyJobResultMsg;
80 static Avo_AcknowledgeMsg dummyAcknowledgeMsg;
81 static int firstAcknowledgeReceived = 0;
82
83 int rxmPid = 0;
84
85 /*
86 * File table of contents
87 */
88 static void set_dmake_env_vars(void);
89
90 /*
91 * void
92 * startup_rxm(void)
93 *
94 * When startup_rxm() is called, read_command_options() and
95 * read_files_and_state() have already been read, so DMake
96 * will now know what options to pass to rxm.
97 *
98 * rxm
99 * [ -k ] [ -n ]
100 * [ -c <dmake_rcfile> ]
101 * [ -g <dmake_group> ]
102 * [ -j <dmake_max_jobs> ]
103 * [ -m <dmake_mode> ]
104 * [ -o <dmake_odir> ]
105 * <read_fd> <write_fd>
106 *
107 * rxm will, among other things, read the rc file.
108 *
109 */
110 void
111 startup_rxm(void)
112 {
113 Name dmake_name;
114 Name dmake_value;
115 Avo_err *find_run_dir_err;
116 int pipe1[2], pipe2[2];
117 Property prop;
118 char *run_dir;
119 char rxm_command[MAXPATHLEN];
120 int rxm_debug = 0;
121
122 int length;
123 char * env;
124
125 firstAcknowledgeReceived = 0;
126 /*
127 * Create two pipes, one for dmake->rxm, and one for rxm->dmake.
128 * pipe1 is dmake->rxm,
129 * pipe2 is rxm->dmake.
130 */
131 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) {
132 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno));
133 }
134
135 set_dmake_env_vars();
136
137 if ((rxmPid = fork()) < 0) { /* error */
138 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno));
139 } else if (rxmPid > 0) { /* parent, dmake */
140 dmake_ofd = pipe1[1]; // write side of pipe
141 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) {
142 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno));
143 }
144 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE);
145
146 dmake_ifd = pipe2[0]; // read side of pipe
147 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) {
148 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno));
149 }
150 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE);
151
152 close(pipe1[0]); // read side
153 close(pipe2[1]); // write side
154 } else { /* child, rxm */
155 close(pipe1[1]); // write side
156 close(pipe2[0]); // read side
157
158 /* Find the run directory of dmake, for rxm. */
159 find_run_dir_err = avo_find_run_dir(&run_dir);
160 if (find_run_dir_err) {
161 delete find_run_dir_err;
162 /* Use the path to find rxm. */
163 (void) sprintf(rxm_command, NOCATGETS("rxm"));
164 } else {
165 /* Use the run dir of dmake for rxm. */
166 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir);
167 }
168
169 if (continue_after_error) {
170 (void) strcat(rxm_command, NOCATGETS(" -k"));
171 }
172 if (do_not_exec_rule) {
173 (void) strcat(rxm_command, NOCATGETS(" -n"));
174 }
175 if (rxm_debug) {
176 (void) strcat(rxm_command, NOCATGETS(" -S"));
177 }
178 if (send_mtool_msgs) {
179 (void) sprintf(&rxm_command[strlen(rxm_command)],
180 NOCATGETS(" -O %d"),
181 mtool_msgs_fd);
182 }
183 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE"));
184 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
185 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
186 ((dmake_value = prop->body.macro.value) != NULL)) {
187 (void) sprintf(&rxm_command[strlen(rxm_command)],
188 NOCATGETS(" -c %s"),
189 dmake_value->string_mb);
190 } else {
191 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE"));
192 env = getmem(length);
193 (void) sprintf(env,
194 "%s=",
195 NOCATGETS("DMAKE_RCFILE"));
196 (void) putenv(env);
197 }
198 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP"));
199 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
200 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
201 ((dmake_value = prop->body.macro.value) != NULL)) {
202 (void) sprintf(&rxm_command[strlen(rxm_command)],
203 NOCATGETS(" -g %s"),
204 dmake_value->string_mb);
205 } else {
206 length = 2 + strlen(NOCATGETS("DMAKE_GROUP"));
207 env = getmem(length);
208 (void) sprintf(env,
209 "%s=",
210 NOCATGETS("DMAKE_GROUP"));
211 (void) putenv(env);
212 }
213 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS"));
214 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
215 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
216 ((dmake_value = prop->body.macro.value) != NULL)) {
217 (void) sprintf(&rxm_command[strlen(rxm_command)],
218 NOCATGETS(" -j %s"),
219 dmake_value->string_mb);
220 } else {
221 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS"));
222 env = getmem(length);
223 (void) sprintf(env,
224 "%s=",
225 NOCATGETS("DMAKE_MAX_JOBS"));
226 (void) putenv(env);
227 }
228 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE"));
229 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
230 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
231 ((dmake_value = prop->body.macro.value) != NULL)) {
232 (void) sprintf(&rxm_command[strlen(rxm_command)],
233 NOCATGETS(" -m %s"),
234 dmake_value->string_mb);
235 } else {
236 length = 2 + strlen(NOCATGETS("DMAKE_MODE"));
237 env = getmem(length);
238 (void) sprintf(env,
239 "%s=",
240 NOCATGETS("DMAKE_MODE"));
241 (void) putenv(env);
242 }
243 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR"));
244 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
245 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
246 ((dmake_value = prop->body.macro.value) != NULL)) {
247 (void) sprintf(&rxm_command[strlen(rxm_command)],
248 NOCATGETS(" -o %s"),
249 dmake_value->string_mb);
250 } else {
251 length = 2 + strlen(NOCATGETS("DMAKE_ODIR"));
252 env = getmem(length);
253 (void) sprintf(env,
254 "%s=",
255 NOCATGETS("DMAKE_ODIR"));
256 (void) putenv(env);
257 }
258
259 (void) sprintf(&rxm_command[strlen(rxm_command)],
260 NOCATGETS(" %d %d"),
261 pipe1[0], pipe2[1]);
262 execl(NOCATGETS("/usr/bin/sh"),
263 NOCATGETS("sh"),
264 NOCATGETS("-c"),
265 rxm_command,
266 (char *)NULL);
267 _exit(127);
268 }
269 }
270
271 /*
272 * static void
273 * set_dmake_env_vars()
274 *
275 * Sets the DMAKE_* environment variables for rxm and rxs.
276 * DMAKE_PWD
277 * DMAKE_NPWD
278 * DMAKE_UMASK
279 * DMAKE_SHELL
280 */
281 static void
282 set_dmake_env_vars()
283 {
284 char *current_netpath;
285 char *current_path;
286 static char *env;
287 int length;
288 char netpath[MAXPATHLEN];
289 mode_t um;
290 char um_buf[MAXPATHLEN];
291 Name dmake_name;
292 Name dmake_value;
293 Property prop;
294
295 /* Set __DMAKE_REDIRECT_STDERR */
296 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1;
297 env = getmem(length);
298 (void) sprintf(env,
299 "%s=%s",
300 NOCATGETS("__DMAKE_REDIRECT_STDERR"),
301 out_err_same ? NOCATGETS("0") : NOCATGETS("1"));
302 (void) putenv(env);
303
304 /* Set DMAKE_PWD to the current working directory */
305 current_path = get_current_path();
306 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path);
307 env = getmem(length);
308 (void) sprintf(env,
309 "%s=%s",
310 NOCATGETS("DMAKE_PWD"),
311 current_path);
312 (void) putenv(env);
313
314 /* Set DMAKE_NPWD to machine:pathname */
315 if (avo_path_to_netpath(current_path, netpath)) {
316 current_netpath = netpath;
317 } else {
318 current_netpath = current_path;
319 }
320 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath);
321 env = getmem(length);
322 (void) sprintf(env,
323 "%s=%s",
324 NOCATGETS("DMAKE_NPWD"),
325 current_netpath);
326 (void) putenv(env);
327
328 /* Set DMAKE_UMASK to the value of umask */
329 um = umask(0);
330 umask(um);
331 (void) sprintf(um_buf, NOCATGETS("%ul"), um);
332 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf);
333 env = getmem(length);
334 (void) sprintf(env,
335 "%s=%s",
336 NOCATGETS("DMAKE_UMASK"),
337 um_buf);
338 (void) putenv(env);
339
340 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) &&
341 ((dmake_value = prop->body.macro.value) != NULL)) {
342 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) +
343 strlen(dmake_value->string_mb);
344 env = getmem(length);
345 (void) sprintf(env,
346 "%s=%s",
347 NOCATGETS("DMAKE_SHELL"),
348 dmake_value->string_mb);
349 (void) putenv(env);
350 }
351 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE"));
352 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
353 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
354 ((dmake_value = prop->body.macro.value) != NULL)) {
355 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) +
356 strlen(dmake_value->string_mb);
357 env = getmem(length);
358 (void) sprintf(env,
359 "%s=%s",
360 NOCATGETS("DMAKE_OUTPUT_MODE"),
361 dmake_value->string_mb);
362 (void) putenv(env);
363 }
364 }
365
366 /*
367 * void
368 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
369 *
370 * Write the DMake rule to be distributed down the pipe to rxm.
371 *
372 */
373 void
374 distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
375 {
376 /* Add all dynamic env vars to the dmake_job_msg. */
377 setvar_envvar(dmake_job_msg);
378
379 /*
380 * Copying dosys()...
381 * Stat .make.state to see if we'll need to reread it later
382 */
383 make_state->stat.time = file_no_time;
384 (void)exists(make_state);
385 make_state_before = make_state->stat.time;
386
387 // Wait for the first Acknowledge message from the rxm process
388 // before sending the first message.
389 if (!firstAcknowledgeReceived) {
390 firstAcknowledgeReceived++;
391 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg();
392 if (msg) {
393 delete msg;
394 }
395 }
396
397 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg;
398 sigset_t newset;
399 sigset_t oldset;
400
401 AVO_BLOCK_INTERUPTS;
402 int xdrResult = xdr(&xdrs_out, doJobMsg);
403
404 if (xdrResult) {
405 fflush(dmake_ofp);
406 AVO_UNBLOCK_INTERUPTS;
407 } else {
408 AVO_UNBLOCK_INTERUPTS;
409 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm"));
410 }
411
412 delete dmake_job_msg;
413 }
414
415 // Queue for JobResult messages.
416 static RWSlistCollectablesQueue jobResultQueue;
417
418 // Queue for Acknowledge messages.
419 static RWSlistCollectablesQueue acknowledgeQueue;
420
421 // Read a message from the rxm process, and put it into a queue, by
422 // message type. Return the message type.
423
424 int
425 getRxmMessage(void)
426 {
427 RWCollectable *msg = (RWCollectable *)0;
428 int msgType = 0;
429 // sigset_t newset;
430 // sigset_t oldset;
431
432 // It seems unnecessarily to block interrupts here because
433 // any nonignored signal means exit for dmake in distributed mode.
434 // AVO_BLOCK_INTERUPTS;
435 int xdrResult = xdr(&xdrs_in, msg);
436 // AVO_UNBLOCK_INTERUPTS;
437
438 if (xdrResult) {
439 switch(msg->isA()) {
440 case __AVO_ACKNOWLEDGEMSG:
441 acknowledgeQueue.append(msg);
442 msgType = __AVO_ACKNOWLEDGEMSG;
443 break;
444 case __AVO_JOBRESULTMSG:
445 jobResultQueue.append(msg);
446 msgType = __AVO_JOBRESULTMSG;
447 break;
448 default:
449 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd"));
450 msgType = 0;
451 break;
452 }
453 } else {
454 if (errno == EINTR) {
455 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr);
456 }
457 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm"));
458 }
459
460 return msgType;
461 }
462
463 // Get a JobResult message from it's queue, and
464 // if the queue is empty, call the getRxmMessage() function until
465 // a JobResult message shows.
466
467 Avo_JobResultMsg *
468 getJobResultMsg(void)
469 {
470 RWCollectable *msg = 0;
471
472 if (!(msg = jobResultQueue.get())) {
473 while (getRxmMessage() != __AVO_JOBRESULTMSG);
474 msg = jobResultQueue.get();
475 }
476
477 return (Avo_JobResultMsg *)msg;
478 }
479
480 // Get an Acknowledge message from it's queue, and
481 // if the queue is empty, call the getRxmMessage() function until
482 // a Acknowledge message shows.
483
484 Avo_AcknowledgeMsg *
485 getAcknowledgeMsg(void)
486 {
487 RWCollectable *msg = 0;
488
489 if (!(msg = acknowledgeQueue.get())) {
490 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG);
491 msg = acknowledgeQueue.get();
492 }
493
494 return (Avo_AcknowledgeMsg *)msg;
495 }
496
497
498 /*
499 * Doname
500 * await_dist(Boolean waitflg)
501 *
502 * Waits for distributed children to exit and finishes their processing.
503 * Rxm will send a msg down the pipe when a child is done.
504 *
505 */
506 Doname
507 await_dist(Boolean waitflg)
508 {
509 Avo_JobResultMsg *dmake_result_msg;
510 int job_msg_id;
511 Doname result = build_ok;
512 int result_msg_cmd_status;
513 int result_msg_job_status;
514 Running rp;
515
516 while (!(dmake_result_msg = getJobResultMsg()));
517 job_msg_id = dmake_result_msg->getId();
518 result_msg_cmd_status = dmake_result_msg->getCmdStatus();
519 result_msg_job_status = dmake_result_msg->getJobStatus();
520
521 if (waitflg) {
522 result = (result_msg_cmd_status == 0) ? build_ok : build_failed;
523
524 #ifdef PRINT_EXIT_STATUS
525 if (result == build_ok) {
526 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok."));
527 } else {
528 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed."));
529 }
530 #endif
531
532 } else {
533 for (rp = running_list;
534 (rp != NULL) && (job_msg_id != rp->job_msg_id);
535 rp = rp->next) {
536 }
537 if (rp == NULL) {
538 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list"));
539 } else {
540 /* XXX - This may not be correct! */
541 if (result_msg_job_status == RETURNED) {
542 rp->state = build_ok;
543 } else {
544 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed;
545 }
546 result = rp->state;
547
548 #ifdef PRINT_EXIT_STATUS
549 if (result == build_ok) {
550 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok."));
551 } else {
552 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed."));
553 }
554 #endif
555
556 }
557 parallel_process_cnt--;
558 }
559 delete dmake_result_msg;
560 return result;
561 }
562
563 #endif
564
565