Skip to content

Commit

Permalink
Merge pull request #43 from smol-rs/notgull/evl-v3.0.0
Browse files Browse the repository at this point in the history
Bump to event-listener v3.0.0
  • Loading branch information
notgull committed Sep 17, 2023
2 parents c76dc08 + 3a82590 commit c788964
Show file tree
Hide file tree
Showing 11 changed files with 683 additions and 623 deletions.
11 changes: 8 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,18 @@ jobs:
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: rustup target add wasm32-unknown-unknown
- name: Install WASM Test Tools
uses: taiki-e/install-action@wasm-pack
- name: Install WASM Test Tools and Cargo Hack
uses: taiki-e/install-action@v2
with:
tool: cargo-hack,wasm-pack
- name: Run cargo check
run: cargo check --all --all-features --all-targets
- run: cargo check --all --no-default-features
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: rustup target add thumbv7m-none-eabi
- run: cargo hack build --all --target thumbv7m-none-eabi --no-default-features --no-dev-deps
- name: Run cargo check for WASM
run: cargo check --all --all-features --all-targets --target wasm32-unknown-unknown
- name: Test WASM
Expand All @@ -57,7 +62,7 @@ jobs:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.48']
rust: ['1.59']
steps:
- uses: actions/checkout@v4
- name: Install Rust
Expand Down
10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "async-lock"
version = "2.8.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
rust-version = "1.48"
rust-version = "1.59"
description = "Async synchronization primitives"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-lock"
Expand All @@ -15,7 +15,13 @@ categories = ["asynchronous", "concurrency"]
exclude = ["/.*"]

[dependencies]
event-listener = "2.5.1"
event-listener = { version = "3.0.0", default-features = false }
event-listener-strategy = { version = "0.2.0", default-features = false }
pin-project-lite = "0.2.11"

[features]
default = ["std"]
std = ["event-listener/std", "event-listener-strategy/std"]

[dev-dependencies]
async-channel = "1.5.0"
Expand Down
77 changes: 41 additions & 36 deletions src/barrier.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use event_listener::{Event, EventListener};

use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

use crate::futures::Lock;
use crate::Mutex;
Expand Down Expand Up @@ -82,21 +82,29 @@ impl Barrier {
BarrierWait {
barrier: self,
lock: Some(self.state.lock()),
evl: EventListener::new(&self.event),
state: WaitState::Initial,
}
}
}

/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a> {
/// The barrier to wait on.
barrier: &'a Barrier,
pin_project_lite::pin_project! {
/// The future returned by [`Barrier::wait()`].
pub struct BarrierWait<'a> {
// The barrier to wait on.
barrier: &'a Barrier,

/// The ongoing mutex lock operation we are blocking on.
lock: Option<Lock<'a, State>>,
// The ongoing mutex lock operation we are blocking on.
#[pin]
lock: Option<Lock<'a, State>>,

/// The current state of the future.
state: WaitState,
// An event listener for the `barrier.event` event.
#[pin]
evl: EventListener,

// The current state of the future.
state: WaitState,
}
}

impl fmt::Debug for BarrierWait<'_> {
Expand All @@ -110,64 +118,61 @@ enum WaitState {
Initial,

/// We are waiting for the listener to complete.
Waiting { evl: EventListener, local_gen: u64 },
Waiting { local_gen: u64 },

/// Waiting to re-acquire the lock to check the state again.
Reacquiring(u64),
Reacquiring { local_gen: u64 },
}

impl Future for BarrierWait<'_> {
type Output = BarrierWaitResult;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut this = self.project();

loop {
match this.state {
WaitState::Initial => {
// See if the lock is ready yet.
let mut state = ready!(Pin::new(this.lock.as_mut().unwrap()).poll(cx));
this.lock = None;
let mut state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
this.lock.set(None);

let local_gen = state.generation_id;
state.count += 1;

if state.count < this.barrier.n {
// We need to wait for the event.
this.state = WaitState::Waiting {
evl: this.barrier.event.listen(),
local_gen,
};
this.evl.as_mut().listen();
*this.state = WaitState::Waiting { local_gen };
} else {
// We are the last one.
state.count = 0;
state.generation_id = state.generation_id.wrapping_add(1);
this.barrier.event.notify(std::usize::MAX);
this.barrier.event.notify(core::usize::MAX);
return Poll::Ready(BarrierWaitResult { is_leader: true });
}
}

WaitState::Waiting {
ref mut evl,
local_gen,
} => {
ready!(Pin::new(evl).poll(cx));
WaitState::Waiting { local_gen } => {
ready!(this.evl.as_mut().poll(cx));

// We are now re-acquiring the mutex.
this.lock = Some(this.barrier.state.lock());
this.state = WaitState::Reacquiring(local_gen);
this.lock.set(Some(this.barrier.state.lock()));
*this.state = WaitState::Reacquiring {
local_gen: *local_gen,
};
}

WaitState::Reacquiring(local_gen) => {
WaitState::Reacquiring { local_gen } => {
// Acquire the local state again.
let state = ready!(Pin::new(this.lock.as_mut().unwrap()).poll(cx));
this.lock = None;
let state = ready!(this.lock.as_mut().as_pin_mut().unwrap().poll(cx));
this.lock.set(None);

if local_gen == state.generation_id && state.count < this.barrier.n {
if *local_gen == state.generation_id && state.count < this.barrier.n {
// We need to wait for the event again.
this.state = WaitState::Waiting {
evl: this.barrier.event.listen(),
local_gen,
this.evl.as_mut().listen();
*this.state = WaitState::Waiting {
local_gen: *local_gen,
};
} else {
// We are ready, but not the leader.
Expand Down
27 changes: 26 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! * [`RwLock`] - a reader-writer lock, allowing any number of readers or a single writer.
//! * [`Semaphore`] - limits the number of concurrent operations.

#![cfg_attr(not(feature = "std"), no_std)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
Expand All @@ -15,6 +16,8 @@
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)]

extern crate alloc;

/// Simple macro to extract the value of `Poll` or return `Pending`.
///
/// TODO: Drop in favor of `core::task::ready`, once MSRV is bumped to 1.64.
Expand All @@ -38,7 +41,7 @@ macro_rules! pin {
let mut $x = $x;
#[allow(unused_mut)]
let mut $x = unsafe {
std::pin::Pin::new_unchecked(&mut $x)
core::pin::Pin::new_unchecked(&mut $x)
};
)*
}
Expand Down Expand Up @@ -69,3 +72,25 @@ pub mod futures {
};
pub use crate::semaphore::{Acquire, AcquireArc};
}

#[cold]
fn abort() -> ! {
// For no_std targets, panicking while panicking is defined as an abort
#[cfg(not(feature = "std"))]
{
struct Bomb;

impl Drop for Bomb {
fn drop(&mut self) {
panic!("Panicking while panicking to abort")
}
}

let _bomb = Bomb;
panic!("Panicking while panicking to abort")
}

// For libstd targets, abort using std::process::abort
#[cfg(feature = "std")]
std::process::abort()
}
Loading

0 comments on commit c788964

Please sign in to comment.