use fuchsia_sync::Mutex;
use futures::future::{FusedFuture, Future};
use slab::Slab;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
const NULL_WAKER_KEY: usize = usize::max_value();
#[derive(Clone)]
pub struct Event {
inner: Arc<EventSignaler>,
}
impl Event {
pub fn new() -> Self {
Self {
inner: Arc::new(EventSignaler {
inner: Arc::new(Mutex::new(EventState {
state: State::Waiting,
wakers: Slab::new(),
})),
}),
}
}
pub fn signal(&self) -> bool {
self.inner.set(State::Signaled)
}
pub fn signaled(&self) -> bool {
self.inner.inner.lock().state == State::Signaled
}
pub fn wait(&self) -> EventWait {
EventWait { inner: self.wait_or_dropped() }
}
pub fn wait_or_dropped(&self) -> EventWaitResult {
EventWaitResult {
inner: (*self.inner).inner.clone(),
waker_key: NULL_WAKER_KEY,
terminated: false,
}
}
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Event {{ state: {:?} }}", self.inner.inner.lock().state)
}
}
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
enum State {
Waiting,
Signaled,
Dropped,
}
struct EventState {
pub state: State,
pub wakers: Slab<Waker>,
}
struct EventSignaler {
inner: Arc<Mutex<EventState>>,
}
impl EventSignaler {
fn set(&self, state: State) -> bool {
assert!(state != State::Waiting, "Cannot reset the state to Waiting");
let mut guard = self.inner.lock();
if let State::Signaled = guard.state {
if !std::thread::panicking() {
assert!(
guard.wakers.is_empty(),
"If there are wakers, a race condition is present"
);
}
false
} else {
let mut wakers = std::mem::replace(&mut guard.wakers, Slab::new());
guard.state = state;
drop(guard);
for waker in wakers.drain() {
waker.wake();
}
true
}
}
}
impl Drop for EventSignaler {
fn drop(&mut self) {
let _: bool = self.set(State::Dropped);
}
}
#[must_use = "futures do nothing unless polled"]
pub struct EventWaitResult {
inner: Arc<Mutex<EventState>>,
waker_key: usize,
terminated: bool,
}
impl Future for EventWaitResult {
type Output = Result<(), Dropped>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let mut guard = this.inner.lock();
match guard.state {
State::Waiting => {
let mut new_key = None;
if this.waker_key == NULL_WAKER_KEY || !guard.wakers.contains(this.waker_key) {
new_key = Some(guard.wakers.insert(cx.waker().clone()));
} else {
guard.wakers[this.waker_key] = cx.waker().clone();
}
if let Some(key) = new_key {
this.waker_key = key;
}
Poll::Pending
}
State::Signaled => {
this.terminated = true;
this.waker_key = NULL_WAKER_KEY;
Poll::Ready(Ok(()))
}
State::Dropped => {
this.terminated = true;
this.waker_key = NULL_WAKER_KEY;
Poll::Ready(Err(Dropped))
}
}
}
}
impl FusedFuture for EventWaitResult {
fn is_terminated(&self) -> bool {
self.terminated
}
}
impl Unpin for EventWaitResult {}
impl Drop for EventWaitResult {
fn drop(&mut self) {
if self.waker_key != NULL_WAKER_KEY {
let mut guard = self.inner.lock();
if guard.wakers.contains(self.waker_key) {
let _ = guard.wakers.remove(self.waker_key);
}
}
}
}
#[must_use = "futures do nothing unless polled"]
pub struct EventWait {
inner: EventWaitResult,
}
impl Future for EventWait {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.inner).poll(cx) {
Poll::Ready(Ok(())) => Poll::Ready(()),
_ => Poll::Pending,
}
}
}
impl FusedFuture for EventWait {
fn is_terminated(&self) -> bool {
self.inner.is_terminated()
}
}
impl Unpin for EventWait {}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub struct Dropped;
impl fmt::Display for Dropped {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "event dropped")
}
}
impl std::error::Error for Dropped {}
#[cfg(test)]
mod tests {
use super::*;
use fuchsia_async as fasync;
#[test]
fn signaled_method_respects_signaling() {
let event = Event::new();
let event_clone = event.clone();
assert!(!event.signaled());
assert!(!event_clone.signaled());
assert!(event.signal());
assert!(event.signaled());
assert!(event_clone.signaled());
}
#[test]
fn unsignaled_event_is_pending() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
}
#[test]
fn signaled_event_is_ready() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait).is_ready());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
}
#[test]
fn event_is_ready_and_wakes_after_stalled() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait).is_ready());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
}
#[test]
fn signaling_event_registers_and_wakes_multiple_waiters_properly() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait_1 = event.wait();
let mut wait_2 = event.wait();
let mut wait_3 = event.wait();
assert!(ex.run_until_stalled(&mut wait_1).is_pending());
assert!(ex.run_until_stalled(&mut wait_2).is_pending());
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait_1).is_ready());
assert!(ex.run_until_stalled(&mut wait_2).is_ready());
assert!(ex.run_until_stalled(&mut wait_3).is_ready());
}
#[test]
fn event_is_terminated_after_complete() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(!wait.is_terminated());
assert!(!wait_or_dropped.is_terminated());
assert!(event.signal());
assert!(ex.run_until_stalled(&mut wait).is_ready());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
assert!(wait.is_terminated());
assert!(wait_or_dropped.is_terminated());
}
#[test]
fn waiter_drops_gracefully() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(!wait.is_terminated());
assert!(!wait_or_dropped.is_terminated());
drop(wait);
drop(wait_or_dropped);
assert!(event.signal());
}
#[test]
fn waiter_completes_after_all_events_drop() {
let mut ex = fasync::TestExecutor::new();
let event = Event::new();
let event_clone = Event::new();
let mut wait = event.wait();
let mut wait_or_dropped = event.wait_or_dropped();
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
assert!(!wait.is_terminated());
assert!(!wait_or_dropped.is_terminated());
drop(event);
drop(event_clone);
assert!(ex.run_until_stalled(&mut wait).is_pending());
assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
}
#[test]
fn drop_receiver_after_poll_without_event_signal() {
let mut exec = fasync::TestExecutor::new();
let event = Event::new();
let mut waiter = event.wait_or_dropped();
assert!(exec.run_until_stalled(&mut waiter).is_pending());
drop(event);
drop(waiter);
}
#[test]
fn drop_receiver_after_event_signal_without_repoll() {
let mut exec = fasync::TestExecutor::new();
let event = Event::new();
let mut waiter = event.wait_or_dropped();
assert_eq!(event.inner.inner.lock().wakers.len(), 0);
assert!(exec.run_until_stalled(&mut waiter).is_pending());
assert_eq!(event.inner.inner.lock().wakers.len(), 1);
assert!(event.signal());
assert_eq!(event.inner.inner.lock().wakers.len(), 0);
drop(waiter);
}
}