diff --git a/Cargo.toml b/Cargo.toml index 671bc72..423d96f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "trawler" -version = "0.5.1" +version = "0.5.2" description = "A workload generator that emulates the traffic to lobste.rs" readme = "README.md" @@ -35,7 +35,7 @@ zipf = "1.0.0" hdrhistogram = "6.0.0" histogram-sampler = "0.1.3" -chan = "0.1.21" +crossbeam-channel = "0.1.2" tokio-core = "0.1" futures = "0.1" diff --git a/src/execution/generator.rs b/src/execution/generator.rs index dae3319..2cf78a9 100644 --- a/src/execution/generator.rs +++ b/src/execution/generator.rs @@ -1,6 +1,6 @@ use WorkerCommand; -use chan; use client::{LobstersClient, LobstersRequest, Vote}; +use crossbeam_channel; use execution::{self, id_to_slug, Sampler, MAX_SLUGGABLE_ID}; use rand::{self, Rng}; use std::sync::atomic; @@ -9,7 +9,7 @@ use std::time; pub(super) fn run( load: execution::Workload, sampler: Sampler, - pool: chan::Sender, + pool: crossbeam_channel::Sender, target: f64, ) -> usize where @@ -106,7 +106,8 @@ where }; let issued = next; - pool.send(WorkerCommand::Request(issued, user, req)); + pool.send(WorkerCommand::Request(issued, user, req)) + .unwrap(); ops += 1; // schedule next delivery diff --git a/src/execution/harness.rs b/src/execution/harness.rs index 2108680..be84208 100644 --- a/src/execution/harness.rs +++ b/src/execution/harness.rs @@ -1,7 +1,7 @@ use BASE_OPS_PER_MIN; use WorkerCommand; -use chan; use client::{LobstersClient, LobstersRequest}; +use crossbeam_channel; use execution::{self, id_to_slug, Sampler}; use rand::{self, Rng}; use std::sync::{Arc, Barrier, Mutex}; @@ -34,7 +34,7 @@ where let runtime = load.runtime; let factory = Arc::new(Mutex::new(factory)); - let (pool, jobs) = chan::async(); + let (pool, jobs) = crossbeam_channel::unbounded(); let workers: Vec<_> = (0..nthreads) .map(|i| { let jobs = jobs.clone(); @@ -62,7 +62,8 @@ where // then, log in all the users for u in 0..sampler.nusers() { - pool.send(WorkerCommand::Request(now, Some(u), LobstersRequest::Login)); + pool.send(WorkerCommand::Request(now, Some(u), LobstersRequest::Login)) + .unwrap(); } if prime { @@ -75,7 +76,7 @@ where // it receives one, it can't receive another until the barrier has been passed! Therefore, // sending `nthreads` barriers should ensure that every thread gets one for _ in 0..nthreads { - pool.send(WorkerCommand::Wait(barrier.clone())); + pool.send(WorkerCommand::Wait(barrier.clone())).unwrap(); } barrier.wait(); @@ -90,12 +91,12 @@ where now, Some(sampler.user(&mut rng)), req, - )); + )).unwrap(); } // wait for all threads to finish priming stories for _ in 0..nthreads { - pool.send(WorkerCommand::Wait(barrier.clone())); + pool.send(WorkerCommand::Wait(barrier.clone())).unwrap(); } barrier.wait(); @@ -129,13 +130,13 @@ where now, Some(sampler.user(&mut rng)), req, - )); + )).unwrap(); } // wait for all threads to finish priming comments // the addition of the ::Wait barrier will also ensure that start is (re)set for _ in 0..nthreads { - pool.send(WorkerCommand::Wait(barrier.clone())); + pool.send(WorkerCommand::Wait(barrier.clone())).unwrap(); } barrier.wait(); println!("--> finished priming database"); @@ -143,7 +144,7 @@ where // wait for all threads to be ready (and set their start time correctly) for _ in 0..nthreads { - pool.send(WorkerCommand::Start(barrier.clone())); + pool.send(WorkerCommand::Start(barrier.clone())).unwrap(); } barrier.wait(); diff --git a/src/execution/issuer.rs b/src/execution/issuer.rs index 9fa43ba..11771ab 100644 --- a/src/execution/issuer.rs +++ b/src/execution/issuer.rs @@ -1,6 +1,6 @@ use WorkerCommand; -use chan; use client::LobstersClient; +use crossbeam_channel; use execution::Stats; use futures::Future; use hdrhistogram::Histogram; @@ -16,7 +16,7 @@ pub(super) fn run( max_in_flight: usize, mut core: tokio_core::reactor::Core, client: C, - jobs: chan::Receiver, + jobs: crossbeam_channel::Receiver, ) -> (Stats, Stats) where C: LobstersClient, @@ -30,9 +30,15 @@ where let sjrn = Rc::new(RefCell::new(HashMap::default())); let rmt = Rc::new(RefCell::new(HashMap::default())); - while let Some(cmd) = jobs.recv() { - match cmd { - WorkerCommand::Wait(barrier) => { + loop { + match jobs.try_recv() { + Err(crossbeam_channel::TryRecvError::Disconnected) => break, + Err(crossbeam_channel::TryRecvError::Empty) => { + // TODO: once we have a futures-aware mpmc channel, we won't have to hack like this + // track https://github.com/crossbeam-rs/crossbeam-channel/issues/22 + core.turn(Some(time::Duration::new(0, 0))) + } + Ok(WorkerCommand::Wait(barrier)) => { // when we get a barrier, wait for all pending requests to complete { while *in_flight.borrow_mut() > 0 { @@ -42,7 +48,7 @@ where barrier.wait(); } - WorkerCommand::Start(barrier) => { + Ok(WorkerCommand::Start(barrier)) => { { while *in_flight.borrow_mut() > 0 { core.turn(None); @@ -53,7 +59,7 @@ where // start should be set to the first time after priming has finished start = Some(time::Instant::now()); } - WorkerCommand::Request(issued, user, request) => { + Ok(WorkerCommand::Request(issued, user, request)) => { // ensure we don't have too many requests in flight at the same time { while *in_flight.borrow_mut() >= max_in_flight { diff --git a/src/lib.rs b/src/lib.rs index 681cf63..663b0d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ //! Rails application, you must apply the patches in `lobsters.diff` first. #![deny(missing_docs)] -extern crate chan; +extern crate crossbeam_channel; extern crate futures; extern crate hdrhistogram; extern crate histogram_sampler;