1 /*
2 * CDDL HEADER START
3 *
4 * The contents of this file are subject to the terms of the
5 * Common Development and Distribution License (the "License").
6 * You may not use this file except in compliance with the License.
7 *
8 * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9 * or http://www.opensolaris.org/os/licensing.
10 * See the License for the specific language governing permissions
11 * and limitations under the License.
12 *
13 * When distributing Covered Code, include this CDDL HEADER in each
14 * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15 * If applicable, add the following below this CDDL HEADER, with the
16 * fields enclosed by brackets "[]" replaced with your own identifying
17 * information: Portions Copyright [yyyy] [name of copyright owner]
18 *
19 * CDDL HEADER END
20 */
21 /*
22 * Copyright 2004 Sun Microsystems, Inc. All rights reserved.
23 * Use is subject to license terms.
24 */
25
26 #ifdef DISTRIBUTED
27 /*
28 * dist.cc
29 *
30 * Deal with the distributed processing
31 */
32
33 #include <avo/err.h>
34 #include <avo/find_dir.h>
35 #include <avo/util.h>
36 #include <dm/Avo_AcknowledgeMsg.h>
37 #include <dm/Avo_DoJobMsg.h>
38 #include <dm/Avo_JobResultMsg.h>
39 #include <mk/defs.h>
40 #include <mksh/misc.h> /* getmem() */
41 #include <rw/pstream.h>
42 #include <rw/queuecol.h>
43 #include <rw/xdrstrea.h>
44 #include <signal.h>
45 #ifdef linux
46 #include <sstream>
47 using namespace std;
48 #else
49 #include <strstream.h>
50 #endif
51 #include <sys/stat.h> /* stat() */
52 #include <sys/types.h>
53 #include <sys/wait.h>
54 #include <unistd.h>
55 #include <errno.h>
56
57 /*
58 * Defined macros
59 */
60
61 #define AVO_BLOCK_INTERUPTS sigfillset(&newset) ; \
62 sigprocmask(SIG_SETMASK, &newset, &oldset)
63
64 #define AVO_UNBLOCK_INTERUPTS \
65 sigprocmask(SIG_SETMASK, &oldset, &newset)
66
67
68 /*
69 * typedefs & structs
70 */
71
72 /*
73 * Static variables
74 */
75 int dmake_ifd;
76 FILE* dmake_ifp;
77 XDR xdrs_in;
78
79 int dmake_ofd;
80 FILE* dmake_ofp;
81 XDR xdrs_out;
82
83 // These instances are required for the RWfactory.
84 static Avo_JobResultMsg dummyJobResultMsg;
85 static Avo_AcknowledgeMsg dummyAcknowledgeMsg;
86 static int firstAcknowledgeReceived = 0;
87
88 int rxmPid = 0;
89
90 /*
91 * File table of contents
92 */
93 static void set_dmake_env_vars(void);
94
95 /*
96 * void
97 * startup_rxm(void)
98 *
99 * When startup_rxm() is called, read_command_options() and
100 * read_files_and_state() have already been read, so DMake
101 * will now know what options to pass to rxm.
102 *
103 * rxm
104 * [ -k ] [ -n ]
105 * [ -c <dmake_rcfile> ]
106 * [ -g <dmake_group> ]
107 * [ -j <dmake_max_jobs> ]
108 * [ -m <dmake_mode> ]
109 * [ -o <dmake_odir> ]
110 * <read_fd> <write_fd>
111 *
112 * rxm will, among other things, read the rc file.
113 *
114 */
115 void
116 startup_rxm(void)
117 {
118 Name dmake_name;
119 Name dmake_value;
120 Avo_err *find_run_dir_err;
121 int pipe1[2], pipe2[2];
122 Property prop;
123 char *run_dir;
124 char rxm_command[MAXPATHLEN];
125 int rxm_debug = 0;
126
127 int length;
128 char * env;
129
130 firstAcknowledgeReceived = 0;
131 /*
132 * Create two pipes, one for dmake->rxm, and one for rxm->dmake.
133 * pipe1 is dmake->rxm,
134 * pipe2 is rxm->dmake.
135 */
136 if ((pipe(pipe1) < 0) || (pipe(pipe2) < 0)) {
137 fatal(catgets(catd, 1, 245, "pipe() failed: %s"), errmsg(errno));
138 }
139
140 set_dmake_env_vars();
141
142 if ((rxmPid = fork()) < 0) { /* error */
143 fatal(catgets(catd, 1, 246, "fork() failed: %s"), errmsg(errno));
144 } else if (rxmPid > 0) { /* parent, dmake */
145 dmake_ofd = pipe1[1]; // write side of pipe
146 if (!(dmake_ofp = fdopen(dmake_ofd, "a"))) {
147 fatal(catgets(catd, 1, 247, "fdopen() failed: %s"), errmsg(errno));
148 }
149 xdrstdio_create(&xdrs_out, dmake_ofp, XDR_ENCODE);
150
151 dmake_ifd = pipe2[0]; // read side of pipe
152 if (!(dmake_ifp = fdopen(dmake_ifd, "r"))) {
153 fatal(catgets(catd, 1, 248, "fdopen() failed: %s"), errmsg(errno));
154 }
155 xdrstdio_create(&xdrs_in, dmake_ifp, XDR_DECODE);
156
157 close(pipe1[0]); // read side
158 close(pipe2[1]); // write side
159 } else { /* child, rxm */
160 close(pipe1[1]); // write side
161 close(pipe2[0]); // read side
162
163 /* Find the run directory of dmake, for rxm. */
164 find_run_dir_err = avo_find_run_dir(&run_dir);
165 if (find_run_dir_err) {
166 delete find_run_dir_err;
167 /* Use the path to find rxm. */
168 (void) sprintf(rxm_command, NOCATGETS("rxm"));
169 } else {
170 /* Use the run dir of dmake for rxm. */
171 (void) sprintf(rxm_command, NOCATGETS("%s/rxm"), run_dir);
172 }
173
174 if (continue_after_error) {
175 (void) strcat(rxm_command, NOCATGETS(" -k"));
176 }
177 if (do_not_exec_rule) {
178 (void) strcat(rxm_command, NOCATGETS(" -n"));
179 }
180 if (rxm_debug) {
181 (void) strcat(rxm_command, NOCATGETS(" -S"));
182 }
183 if (send_mtool_msgs) {
184 (void) sprintf(&rxm_command[strlen(rxm_command)],
185 NOCATGETS(" -O %d"),
186 mtool_msgs_fd);
187 }
188 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_RCFILE"));
189 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
190 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
191 ((dmake_value = prop->body.macro.value) != NULL)) {
192 (void) sprintf(&rxm_command[strlen(rxm_command)],
193 NOCATGETS(" -c %s"),
194 dmake_value->string_mb);
195 } else {
196 length = 2 + strlen(NOCATGETS("DMAKE_RCFILE"));
197 env = getmem(length);
198 (void) sprintf(env,
199 "%s=",
200 NOCATGETS("DMAKE_RCFILE"));
201 (void) putenv(env);
202 }
203 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_GROUP"));
204 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
205 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
206 ((dmake_value = prop->body.macro.value) != NULL)) {
207 (void) sprintf(&rxm_command[strlen(rxm_command)],
208 NOCATGETS(" -g %s"),
209 dmake_value->string_mb);
210 } else {
211 length = 2 + strlen(NOCATGETS("DMAKE_GROUP"));
212 env = getmem(length);
213 (void) sprintf(env,
214 "%s=",
215 NOCATGETS("DMAKE_GROUP"));
216 (void) putenv(env);
217 }
218 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MAX_JOBS"));
219 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
220 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
221 ((dmake_value = prop->body.macro.value) != NULL)) {
222 (void) sprintf(&rxm_command[strlen(rxm_command)],
223 NOCATGETS(" -j %s"),
224 dmake_value->string_mb);
225 } else {
226 length = 2 + strlen(NOCATGETS("DMAKE_MAX_JOBS"));
227 env = getmem(length);
228 (void) sprintf(env,
229 "%s=",
230 NOCATGETS("DMAKE_MAX_JOBS"));
231 (void) putenv(env);
232 }
233 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_MODE"));
234 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
235 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
236 ((dmake_value = prop->body.macro.value) != NULL)) {
237 (void) sprintf(&rxm_command[strlen(rxm_command)],
238 NOCATGETS(" -m %s"),
239 dmake_value->string_mb);
240 } else {
241 length = 2 + strlen(NOCATGETS("DMAKE_MODE"));
242 env = getmem(length);
243 (void) sprintf(env,
244 "%s=",
245 NOCATGETS("DMAKE_MODE"));
246 (void) putenv(env);
247 }
248 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_ODIR"));
249 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
250 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
251 ((dmake_value = prop->body.macro.value) != NULL)) {
252 (void) sprintf(&rxm_command[strlen(rxm_command)],
253 NOCATGETS(" -o %s"),
254 dmake_value->string_mb);
255 } else {
256 length = 2 + strlen(NOCATGETS("DMAKE_ODIR"));
257 env = getmem(length);
258 (void) sprintf(env,
259 "%s=",
260 NOCATGETS("DMAKE_ODIR"));
261 (void) putenv(env);
262 }
263
264 (void) sprintf(&rxm_command[strlen(rxm_command)],
265 NOCATGETS(" %d %d"),
266 pipe1[0], pipe2[1]);
267 #ifdef linux
268 execl(NOCATGETS("/bin/sh"),
269 #else
270 execl(NOCATGETS("/usr/bin/sh"),
271 #endif
272 NOCATGETS("sh"),
273 NOCATGETS("-c"),
274 rxm_command,
275 (char *)NULL);
276 _exit(127);
277 }
278 }
279
280 /*
281 * static void
282 * set_dmake_env_vars()
283 *
284 * Sets the DMAKE_* environment variables for rxm and rxs.
285 * DMAKE_PWD
286 * DMAKE_NPWD
287 * DMAKE_UMASK
288 * DMAKE_SHELL
289 */
290 static void
291 set_dmake_env_vars()
292 {
293 char *current_netpath;
294 char *current_path;
295 static char *env;
296 int length;
297 char netpath[MAXPATHLEN];
298 mode_t um;
299 char um_buf[MAXPATHLEN];
300 Name dmake_name;
301 Name dmake_value;
302 Property prop;
303
304 #ifdef REDIRECT_ERR
305 /* Set __DMAKE_REDIRECT_STDERR */
306 length = 2 + strlen(NOCATGETS("__DMAKE_REDIRECT_STDERR")) + 1;
307 env = getmem(length);
308 (void) sprintf(env,
309 "%s=%s",
310 NOCATGETS("__DMAKE_REDIRECT_STDERR"),
311 out_err_same ? NOCATGETS("0") : NOCATGETS("1"));
312 (void) putenv(env);
313 #endif
314
315 /* Set DMAKE_PWD to the current working directory */
316 current_path = get_current_path();
317 length = 2 + strlen(NOCATGETS("DMAKE_PWD")) + strlen(current_path);
318 env = getmem(length);
319 (void) sprintf(env,
320 "%s=%s",
321 NOCATGETS("DMAKE_PWD"),
322 current_path);
323 (void) putenv(env);
324
325 /* Set DMAKE_NPWD to machine:pathname */
326 if (avo_path_to_netpath(current_path, netpath)) {
327 current_netpath = netpath;
328 } else {
329 current_netpath = current_path;
330 }
331 length = 2 + strlen(NOCATGETS("DMAKE_NPWD")) + strlen(current_netpath);
332 env = getmem(length);
333 (void) sprintf(env,
334 "%s=%s",
335 NOCATGETS("DMAKE_NPWD"),
336 current_netpath);
337 (void) putenv(env);
338
339 /* Set DMAKE_UMASK to the value of umask */
340 um = umask(0);
341 umask(um);
342 (void) sprintf(um_buf, NOCATGETS("%ul"), um);
343 length = 2 + strlen(NOCATGETS("DMAKE_UMASK")) + strlen(um_buf);
344 env = getmem(length);
345 (void) sprintf(env,
346 "%s=%s",
347 NOCATGETS("DMAKE_UMASK"),
348 um_buf);
349 (void) putenv(env);
350
351 if (((prop = get_prop(shell_name->prop, macro_prop)) != NULL) &&
352 ((dmake_value = prop->body.macro.value) != NULL)) {
353 length = 2 + strlen(NOCATGETS("DMAKE_SHELL")) +
354 strlen(dmake_value->string_mb);
355 env = getmem(length);
356 (void) sprintf(env,
357 "%s=%s",
358 NOCATGETS("DMAKE_SHELL"),
359 dmake_value->string_mb);
360 (void) putenv(env);
361 }
362 MBSTOWCS(wcs_buffer, NOCATGETS("DMAKE_OUTPUT_MODE"));
363 dmake_name = GETNAME(wcs_buffer, FIND_LENGTH);
364 if (((prop = get_prop(dmake_name->prop, macro_prop)) != NULL) &&
365 ((dmake_value = prop->body.macro.value) != NULL)) {
366 length = 2 + strlen(NOCATGETS("DMAKE_OUTPUT_MODE")) +
367 strlen(dmake_value->string_mb);
368 env = getmem(length);
369 (void) sprintf(env,
370 "%s=%s",
371 NOCATGETS("DMAKE_OUTPUT_MODE"),
372 dmake_value->string_mb);
373 (void) putenv(env);
374 }
375 }
376
377 /*
378 * void
379 * distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
380 *
381 * Write the DMake rule to be distributed down the pipe to rxm.
382 *
383 */
384 void
385 distribute_rxm(Avo_DoJobMsg *dmake_job_msg)
386 {
387 /* Add all dynamic env vars to the dmake_job_msg. */
388 setvar_envvar(dmake_job_msg);
389
390 /*
391 * Copying dosys()...
392 * Stat .make.state to see if we'll need to reread it later
393 */
394 make_state->stat.time = file_no_time;
395 (void)exists(make_state);
396 make_state_before = make_state->stat.time;
397
398 // Wait for the first Acknowledge message from the rxm process
399 // before sending the first message.
400 if (!firstAcknowledgeReceived) {
401 firstAcknowledgeReceived++;
402 Avo_AcknowledgeMsg *msg = getAcknowledgeMsg();
403 if (msg) {
404 delete msg;
405 }
406 }
407
408 RWCollectable *doJobMsg = (RWCollectable *)dmake_job_msg;
409 sigset_t newset;
410 sigset_t oldset;
411
412 AVO_BLOCK_INTERUPTS;
413 int xdrResult = xdr(&xdrs_out, doJobMsg);
414
415 if (xdrResult) {
416 fflush(dmake_ofp);
417 AVO_UNBLOCK_INTERUPTS;
418 } else {
419 AVO_UNBLOCK_INTERUPTS;
420 fatal(catgets(catd, 1, 249, "Couldn't send the job request to rxm"));
421 }
422
423 delete dmake_job_msg;
424 }
425
426 // Queue for JobResult messages.
427 static RWSlistCollectablesQueue jobResultQueue;
428
429 // Queue for Acknowledge messages.
430 static RWSlistCollectablesQueue acknowledgeQueue;
431
432 // Read a message from the rxm process, and put it into a queue, by
433 // message type. Return the message type.
434
435 int
436 getRxmMessage(void)
437 {
438 RWCollectable *msg = (RWCollectable *)0;
439 int msgType = 0;
440 // sigset_t newset;
441 // sigset_t oldset;
442
443 // It seems unnecessarily to block interrupts here because
444 // any nonignored signal means exit for dmake in distributed mode.
445 // AVO_BLOCK_INTERUPTS;
446 int xdrResult = xdr(&xdrs_in, msg);
447 // AVO_UNBLOCK_INTERUPTS;
448
449 if (xdrResult) {
450 switch(msg->isA()) {
451 case __AVO_ACKNOWLEDGEMSG:
452 acknowledgeQueue.append(msg);
453 msgType = __AVO_ACKNOWLEDGEMSG;
454 break;
455 case __AVO_JOBRESULTMSG:
456 jobResultQueue.append(msg);
457 msgType = __AVO_JOBRESULTMSG;
458 break;
459 default:
460 warning(catgets(catd, 1, 291, "Unknown message on rxm input fd"));
461 msgType = 0;
462 break;
463 }
464 } else {
465 if (errno == EINTR) {
466 fputs(NOCATGETS("dmake: Internal error: xdr() has been interrupted by a signal.\n"), stderr);
467 }
468 fatal(catgets(catd, 1, 250, "Couldn't receive message from rxm"));
469 }
470
471 return msgType;
472 }
473
474 // Get a JobResult message from it's queue, and
475 // if the queue is empty, call the getRxmMessage() function until
476 // a JobResult message shows.
477
478 Avo_JobResultMsg *
479 getJobResultMsg(void)
480 {
481 RWCollectable *msg = 0;
482
483 if (!(msg = jobResultQueue.get())) {
484 while (getRxmMessage() != __AVO_JOBRESULTMSG);
485 msg = jobResultQueue.get();
486 }
487
488 return (Avo_JobResultMsg *)msg;
489 }
490
491 // Get an Acknowledge message from it's queue, and
492 // if the queue is empty, call the getRxmMessage() function until
493 // a Acknowledge message shows.
494
495 Avo_AcknowledgeMsg *
496 getAcknowledgeMsg(void)
497 {
498 RWCollectable *msg = 0;
499
500 if (!(msg = acknowledgeQueue.get())) {
501 while (getRxmMessage() != __AVO_ACKNOWLEDGEMSG);
502 msg = acknowledgeQueue.get();
503 }
504
505 return (Avo_AcknowledgeMsg *)msg;
506 }
507
508
509 /*
510 * Doname
511 * await_dist(Boolean waitflg)
512 *
513 * Waits for distributed children to exit and finishes their processing.
514 * Rxm will send a msg down the pipe when a child is done.
515 *
516 */
517 Doname
518 await_dist(Boolean waitflg)
519 {
520 Avo_JobResultMsg *dmake_result_msg;
521 int job_msg_id;
522 Doname result = build_ok;
523 int result_msg_cmd_status;
524 int result_msg_job_status;
525 Running rp;
526
527 while (!(dmake_result_msg = getJobResultMsg()));
528 job_msg_id = dmake_result_msg->getId();
529 result_msg_cmd_status = dmake_result_msg->getCmdStatus();
530 result_msg_job_status = dmake_result_msg->getJobStatus();
531
532 if (waitflg) {
533 result = (result_msg_cmd_status == 0) ? build_ok : build_failed;
534
535 #ifdef PRINT_EXIT_STATUS
536 if (result == build_ok) {
537 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_ok."));
538 } else {
539 warning(NOCATGETS("I'm in await_dist(), waitflg is true, and result is build_failed."));
540 }
541 #endif
542
543 } else {
544 for (rp = running_list;
545 (rp != NULL) && (job_msg_id != rp->job_msg_id);
546 rp = rp->next) {
547 }
548 if (rp == NULL) {
549 fatal(catgets(catd, 1, 251, "Internal error: returned child job_msg_id not in running_list"));
550 } else {
551 /* XXX - This may not be correct! */
552 if (result_msg_job_status == RETURNED) {
553 rp->state = build_ok;
554 } else {
555 rp->state = (result_msg_cmd_status == 0) ? build_ok : build_failed;
556 }
557 result = rp->state;
558
559 #ifdef PRINT_EXIT_STATUS
560 if (result == build_ok) {
561 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_ok."));
562 } else {
563 warning(NOCATGETS("I'm in await_dist(), waitflg is false, and result is build_failed."));
564 }
565 #endif
566
567 }
568 parallel_process_cnt--;
569 }
570 delete dmake_result_msg;
571 return result;
572 }
573
574 #endif
575
576