This repository has been archived by the owner on Jul 11, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 36
/
exec_inputs.rs
79 lines (70 loc) · 2.88 KB
/
exec_inputs.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use arguments::{self, JOBLOG};
use execute::command;
use input_iterator::InputsLock;
use shell;
use time::Timespec;
use verbose;
use super::job_log::JobLog;
use super::pipe::disk::State;
use super::child::handle_child;
use std::u16;
use std::time::Duration;
use std::io::{self, Read, Write};
use std::sync::mpsc::Sender;
/// Contains all the required data needed for executing commands in parallel.
/// The inputs will be executed as commands themselves.
pub struct ExecInputs<IO: Read> {
pub num_inputs: usize,
pub timeout: Duration,
pub inputs: InputsLock<IO>,
pub output_tx: Sender<State>,
pub tempdir: String,
}
impl<IO: Read> ExecInputs<IO> {
pub fn run(&mut self, mut flags: u16) {
let stdout = io::stdout();
let stderr = io::stderr();
let has_timeout = self.timeout != Duration::from_millis(0);
let mut input = String::with_capacity(64);
let mut id_buffer = [0u8; 20];
while let Some(job_id) = self.inputs.try_next(&mut input) {
if flags & arguments::VERBOSE_MODE != 0 {
verbose::processing_task(&stdout, job_id+1, self.num_inputs, &input);
}
// Checks the current command to determine if a shell will be required.
if shell::required(shell::Kind::Input(&input)) {
flags |= arguments::SHELL_ENABLED;
} else {
flags &= u16::MAX ^ arguments::SHELL_ENABLED;
}
let (start_time, end_time, exit_value, signal) = match command::get_command_output(&input, flags) {
Ok(child) => {
handle_child(child, &self.output_tx, flags, job_id, input.clone(), has_timeout, self.timeout,
&self.tempdir, &mut id_buffer)
},
Err(why) => {
let mut stderr = stderr.lock();
let _ = write!(&mut stderr, "parallel: command error: {}: {}\n", input, why);
let message = format!("{}: {}: {}\n", job_id, input, why);
let _ = self.output_tx.send(State::Error(job_id, message));
(Timespec::new(0, 0), Timespec::new(0, 0), -1, 0)
}
};
if flags & JOBLOG != 0 {
let runtime = end_time - start_time;
let _ = self.output_tx.send(State::JobLog(JobLog {
job_id: job_id,
start_time: start_time,
runtime: runtime.num_nanoseconds().unwrap_or(0) as u64,
exit_value: exit_value,
signal: signal,
flags: flags,
command: input.clone(),
}));
}
if flags & arguments::VERBOSE_MODE != 0 {
verbose::task_complete(&stdout, job_id, self.num_inputs, &input);
}
}
}
}