Skip to content

Commit

Permalink
Make mock test server emit timing data (pantsbuild#5891)
Browse files Browse the repository at this point in the history
  • Loading branch information
wisechengyi authored and illicitonion committed Jun 5, 2018
1 parent f6918ff commit 4c7425d
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 25 deletions.
47 changes: 29 additions & 18 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant};

use bazel_protos;
use boxfuture::{BoxFuture, Boxable};
Expand Down Expand Up @@ -84,7 +84,7 @@ impl super::CommandRunner for CommandRunner {
.map(|result| (Arc::new(execute_request), result))
})
.and_then(move |(execute_request, operation)| {
let start_time = SystemTime::now();
let start_time = Instant::now();

future::loop_fn((operation, 0), move |(operation, iter_num)| {
let req_description = req_description.clone();
Expand Down Expand Up @@ -128,17 +128,8 @@ impl super::CommandRunner for CommandRunner {
(1 + iter_num) * CommandRunner::BACKOFF_INCR_WAIT_MILLIS,
);

let grpc_result =
map_grpc_result(operations_client.get().get_operation(&operation_request));

let operation = try_future!(grpc_result);

// take the grpc result and cancel the op if too much time has passed.
let elapsed = try_future!(
start_time
.elapsed()
.map_err(|err| format!("Something weird happened with time {:?}", err))
);
let elapsed = start_time.elapsed();

if elapsed > req_timeout {
future::err(format!(
Expand All @@ -155,7 +146,12 @@ impl super::CommandRunner for CommandRunner {
)
})
.and_then(move |_| {
future::ok(future::Loop::Continue((operation, iter_num + 1))).to_boxed()
future::done(map_grpc_result(
operations_client.get().get_operation(&operation_request),
)).map(move |operation| {
future::Loop::Continue((operation, iter_num + 1))
})
.to_boxed()
})
.to_boxed()
}
Expand Down Expand Up @@ -636,7 +632,8 @@ mod tests {
use std::iter::{self, FromIterator};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use std::time::Duration;
use std::ops::Sub;

#[derive(Debug, PartialEq)]
enum StdoutType {
Expand Down Expand Up @@ -1406,9 +1403,13 @@ mod tests {
],
))
};
let start_time = SystemTime::now();
run_command_remote(mock_server.address(), execute_request).unwrap();
assert!(start_time.elapsed().unwrap() >= Duration::from_millis(500));

let messages = mock_server.mock_responder.received_messages.lock().unwrap();
assert!(messages.len() == 2);
assert!(
messages.get(1).unwrap().2.sub(messages.get(0).unwrap().2) >= Duration::from_millis(500)
);
}
}

Expand All @@ -1435,9 +1436,19 @@ mod tests {
],
))
};
let start_time = SystemTime::now();
run_command_remote(mock_server.address(), execute_request).unwrap();
assert!(start_time.elapsed().unwrap() >= Duration::from_millis(3000));

let messages = mock_server.mock_responder.received_messages.lock().unwrap();
assert!(messages.len() == 4);
assert!(
messages.get(1).unwrap().2.sub(messages.get(0).unwrap().2) >= Duration::from_millis(500)
);
assert!(
messages.get(2).unwrap().2.sub(messages.get(1).unwrap().2) >= Duration::from_millis(1000)
);
assert!(
messages.get(3).unwrap().2.sub(messages.get(2).unwrap().2) >= Duration::from_millis(1500)
);
}
}

Expand Down
15 changes: 8 additions & 7 deletions src/rust/engine/testutil/mock/src/execution_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::ops::Deref;
use std::thread::sleep;
use std::time::Duration;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use bazel_protos;
use grpcio;
Expand Down Expand Up @@ -45,7 +46,7 @@ impl MockExecution {
/// responses.
///
pub struct TestServer {
mock_responder: MockResponder,
pub mock_responder: MockResponder,
server_transport: grpcio::Server,
}

Expand Down Expand Up @@ -127,7 +128,7 @@ impl Drop for TestServer {
#[derive(Clone, Debug)]
pub struct MockResponder {
mock_execution: MockExecution,
received_messages: Arc<Mutex<Vec<(String, Box<protobuf::Message>)>>>,
pub received_messages: Arc<Mutex<Vec<(String, Box<protobuf::Message>, Instant)>>>,
}

impl MockResponder {
Expand All @@ -139,11 +140,11 @@ impl MockResponder {
}

fn log<T: protobuf::Message + Sized>(&self, message: T) {
self
.received_messages
.lock()
.unwrap()
.push((message.descriptor().name().to_string(), Box::new(message)));
self.received_messages.lock().unwrap().push((
message.descriptor().name().to_string(),
Box::new(message),
Instant::now(),
));
}

fn display_all<D: Debug>(items: &Vec<D>) -> String {
Expand Down

0 comments on commit 4c7425d

Please sign in to comment.