-
Notifications
You must be signed in to change notification settings - Fork 9
/
issuer.rs
131 lines (120 loc) · 4.71 KB
/
issuer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
use WorkerCommand;
use client::LobstersClient;
use crossbeam_channel;
use execution::Stats;
use futures::Future;
use hdrhistogram::Histogram;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::{mem, time};
use tokio_core;
pub(super) fn run<C>(
warmup: time::Duration,
runtime: time::Duration,
max_in_flight: usize,
mut core: tokio_core::reactor::Core,
client: C,
jobs: crossbeam_channel::Receiver<WorkerCommand>,
) -> (Stats, Stats)
where
C: LobstersClient,
{
let mut start = None;
let client = Rc::new(client);
let in_flight = Rc::new(RefCell::new(0));
let handle = core.handle();
let end = warmup + runtime;
let sjrn = Rc::new(RefCell::new(HashMap::default()));
let rmt = Rc::new(RefCell::new(HashMap::default()));
loop {
match jobs.try_recv() {
Err(crossbeam_channel::TryRecvError::Disconnected) => break,
Err(crossbeam_channel::TryRecvError::Empty) => {
// TODO: once we have a futures-aware mpmc channel, we won't have to hack like this
// track https://github.com/crossbeam-rs/crossbeam-channel/issues/22
core.turn(Some(time::Duration::new(0, 0)))
}
Ok(WorkerCommand::Wait(barrier)) => {
// when we get a barrier, wait for all pending requests to complete
{
while *in_flight.borrow_mut() > 0 {
core.turn(None);
}
}
barrier.wait();
}
Ok(WorkerCommand::Start(barrier)) => {
{
while *in_flight.borrow_mut() > 0 {
core.turn(None);
}
}
barrier.wait();
// start should be set to the first time after priming has finished
start = Some(time::Instant::now());
}
Ok(WorkerCommand::Request(issued, user, request)) => {
// ensure we don't have too many requests in flight at the same time
{
while *in_flight.borrow_mut() >= max_in_flight {
core.turn(None);
}
*in_flight.borrow_mut() += 1;
}
let in_flight = in_flight.clone();
let sjrn = sjrn.clone();
let rmt = rmt.clone();
let variant = mem::discriminant(&request);
handle.spawn(
C::handle(client.clone(), user, request).then(move |remote_t| {
*in_flight.borrow_mut() -= 1;
if start.is_none() {
return Ok(());
}
let start = start.unwrap();
if remote_t.is_ok() && issued.duration_since(start) > warmup {
let remote_t = remote_t.unwrap();
let sjrn_t = issued.elapsed();
rmt.borrow_mut()
.entry(variant)
.or_insert_with(|| {
Histogram::<u64>::new_with_bounds(1, 10_000, 4).unwrap()
})
.saturating_record(
remote_t.as_secs() * 1_000
+ remote_t.subsec_nanos() as u64 / 1_000_000,
);
sjrn.borrow_mut()
.entry(variant)
.or_insert_with(|| {
Histogram::<u64>::new_with_bounds(1, 10_000, 4).unwrap()
})
.saturating_record(
sjrn_t.as_secs() * 1_000
+ sjrn_t.subsec_nanos() as u64 / 1_000_000,
);
}
Ok(())
}),
);
}
}
if let Some(start) = start {
if start.elapsed() > end {
// we're past the end of the experiments and should exit cleanly.
// ignore anything left in the queue, and just finish up our current work.
while *in_flight.borrow_mut() > 0 {
core.turn(None);
}
break;
}
}
}
let mut sjrn = sjrn.borrow_mut();
let mut rmt = rmt.borrow_mut();
(
mem::replace(&mut *sjrn, HashMap::default()),
mem::replace(&mut *rmt, HashMap::default()),
)
}