Skip to content

Commit

Permalink
Continue to do work while waiting for cmds
Browse files Browse the repository at this point in the history
Issuers are less efficient than we'd like since we now need to
alternate between turning the core and trying to receive new jobs. This
will be fixed once the crossbeam-channel crate is futures-aware:

crossbeam-rs/crossbeam-channel#22
  • Loading branch information
jonhoo committed Mar 29, 2018
1 parent 9e70f00 commit 17d4f1e
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 22 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "trawler"
version = "0.5.1"
version = "0.5.2"

description = "A workload generator that emulates the traffic to lobste.rs"
readme = "README.md"
Expand Down Expand Up @@ -35,7 +35,7 @@ zipf = "1.0.0"
hdrhistogram = "6.0.0"
histogram-sampler = "0.1.3"

chan = "0.1.21"
crossbeam-channel = "0.1.2"
tokio-core = "0.1"
futures = "0.1"

Expand Down
7 changes: 4 additions & 3 deletions src/execution/generator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use WorkerCommand;
use chan;
use client::{LobstersClient, LobstersRequest, Vote};
use crossbeam_channel;
use execution::{self, id_to_slug, Sampler, MAX_SLUGGABLE_ID};
use rand::{self, Rng};
use std::sync::atomic;
Expand All @@ -9,7 +9,7 @@ use std::time;
pub(super) fn run<C>(
load: execution::Workload,
sampler: Sampler,
pool: chan::Sender<WorkerCommand>,
pool: crossbeam_channel::Sender<WorkerCommand>,
target: f64,
) -> usize
where
Expand Down Expand Up @@ -106,7 +106,8 @@ where
};

let issued = next;
pool.send(WorkerCommand::Request(issued, user, req));
pool.send(WorkerCommand::Request(issued, user, req))
.unwrap();
ops += 1;

// schedule next delivery
Expand Down
19 changes: 10 additions & 9 deletions src/execution/harness.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use BASE_OPS_PER_MIN;
use WorkerCommand;
use chan;
use client::{LobstersClient, LobstersRequest};
use crossbeam_channel;
use execution::{self, id_to_slug, Sampler};
use rand::{self, Rng};
use std::sync::{Arc, Barrier, Mutex};
Expand Down Expand Up @@ -34,7 +34,7 @@ where
let runtime = load.runtime;

let factory = Arc::new(Mutex::new(factory));
let (pool, jobs) = chan::async();
let (pool, jobs) = crossbeam_channel::unbounded();
let workers: Vec<_> = (0..nthreads)
.map(|i| {
let jobs = jobs.clone();
Expand Down Expand Up @@ -62,7 +62,8 @@ where

// then, log in all the users
for u in 0..sampler.nusers() {
pool.send(WorkerCommand::Request(now, Some(u), LobstersRequest::Login));
pool.send(WorkerCommand::Request(now, Some(u), LobstersRequest::Login))
.unwrap();
}

if prime {
Expand All @@ -75,7 +76,7 @@ where
// it receives one, it can't receive another until the barrier has been passed! Therefore,
// sending `nthreads` barriers should ensure that every thread gets one
for _ in 0..nthreads {
pool.send(WorkerCommand::Wait(barrier.clone()));
pool.send(WorkerCommand::Wait(barrier.clone())).unwrap();
}
barrier.wait();

Expand All @@ -90,12 +91,12 @@ where
now,
Some(sampler.user(&mut rng)),
req,
));
)).unwrap();
}

// wait for all threads to finish priming stories
for _ in 0..nthreads {
pool.send(WorkerCommand::Wait(barrier.clone()));
pool.send(WorkerCommand::Wait(barrier.clone())).unwrap();
}
barrier.wait();

Expand Down Expand Up @@ -129,21 +130,21 @@ where
now,
Some(sampler.user(&mut rng)),
req,
));
)).unwrap();
}

// wait for all threads to finish priming comments
// the addition of the ::Wait barrier will also ensure that start is (re)set
for _ in 0..nthreads {
pool.send(WorkerCommand::Wait(barrier.clone()));
pool.send(WorkerCommand::Wait(barrier.clone())).unwrap();
}
barrier.wait();
println!("--> finished priming database");
}

// wait for all threads to be ready (and set their start time correctly)
for _ in 0..nthreads {
pool.send(WorkerCommand::Start(barrier.clone()));
pool.send(WorkerCommand::Start(barrier.clone())).unwrap();
}
barrier.wait();

Expand Down
20 changes: 13 additions & 7 deletions src/execution/issuer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use WorkerCommand;
use chan;
use client::LobstersClient;
use crossbeam_channel;
use execution::Stats;
use futures::Future;
use hdrhistogram::Histogram;
Expand All @@ -16,7 +16,7 @@ pub(super) fn run<C>(
max_in_flight: usize,
mut core: tokio_core::reactor::Core,
client: C,
jobs: chan::Receiver<WorkerCommand>,
jobs: crossbeam_channel::Receiver<WorkerCommand>,
) -> (Stats, Stats)
where
C: LobstersClient,
Expand All @@ -30,9 +30,15 @@ where
let sjrn = Rc::new(RefCell::new(HashMap::default()));
let rmt = Rc::new(RefCell::new(HashMap::default()));

while let Some(cmd) = jobs.recv() {
match cmd {
WorkerCommand::Wait(barrier) => {
loop {
match jobs.try_recv() {
Err(crossbeam_channel::TryRecvError::Disconnected) => break,
Err(crossbeam_channel::TryRecvError::Empty) => {
// TODO: once we have a futures-aware mpmc channel, we won't have to hack like this
// track https://github.com/crossbeam-rs/crossbeam-channel/issues/22
core.turn(Some(time::Duration::new(0, 0)))
}
Ok(WorkerCommand::Wait(barrier)) => {
// when we get a barrier, wait for all pending requests to complete
{
while *in_flight.borrow_mut() > 0 {
Expand All @@ -42,7 +48,7 @@ where

barrier.wait();
}
WorkerCommand::Start(barrier) => {
Ok(WorkerCommand::Start(barrier)) => {
{
while *in_flight.borrow_mut() > 0 {
core.turn(None);
Expand All @@ -53,7 +59,7 @@ where
// start should be set to the first time after priming has finished
start = Some(time::Instant::now());
}
WorkerCommand::Request(issued, user, request) => {
Ok(WorkerCommand::Request(issued, user, request)) => {
// ensure we don't have too many requests in flight at the same time
{
while *in_flight.borrow_mut() >= max_in_flight {
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! Rails application, you must apply the patches in `lobsters.diff` first.
#![deny(missing_docs)]

extern crate chan;
extern crate crossbeam_channel;
extern crate futures;
extern crate hdrhistogram;
extern crate histogram_sampler;
Expand Down

0 comments on commit 17d4f1e

Please sign in to comment.