1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
45//! An event that can be signaled and waited on by multiple consumers.
67use fuchsia_sync::Mutex;
8use futures::future::{FusedFuture, Future};
9use slab::Slab;
10use std::fmt;
11use std::pin::Pin;
12use std::sync::Arc;
13use std::task::{Context, Poll, Waker};
1415const NULL_WAKER_KEY: usize = usize::max_value();
1617/// An `Event` is a clonable object that can be signaled once. Calls to `.wait()` produce a future,
18/// `EventWait`, that can wait on that signal. Once the `Event` has been signaled, all futures will
19/// complete immediately.
20#[derive(Clone)]
21pub struct Event {
22 inner: Arc<EventSignaler>,
23}
2425impl Event {
26/// Create a new `Event` that has not yet been signaled.
27pub fn new() -> Self {
28Self {
29 inner: Arc::new(EventSignaler {
30 inner: Arc::new(Mutex::new(EventState {
31 state: State::Waiting,
32 wakers: Slab::new(),
33 })),
34 }),
35 }
36 }
3738/// Signal the `Event`. Once this is done, it cannot be undone. Any tasks waiting on this
39 /// `Event` will be notified and its `Future` implementation will complete.
40 ///
41 /// Returns true if this `Event` was the one that performed the signal operation.
42pub fn signal(&self) -> bool {
43self.inner.set(State::Signaled)
44 }
4546/// Return true if `Event::signal` has already been called.
47pub fn signaled(&self) -> bool {
48self.inner.inner.lock().state == State::Signaled
49 }
5051/// Create a new `EventWait` future that will complete after this event has been signaled.
52 /// If all signalers are dropped, this future will continue to return `Poll::Pending`. To be
53 /// notified when all signalers are dropped without signaling, use `wait_or_dropped`.
54pub fn wait(&self) -> EventWait {
55 EventWait { inner: self.wait_or_dropped() }
56 }
5758/// Create a new `EventWaitResult` future that will complete after this event has been
59 /// signaled or all `Event` clones have been dropped.
60 ///
61 /// This future will output a `Result<(), Dropped>` to indicate what has occurred.
62pub fn wait_or_dropped(&self) -> EventWaitResult {
63 EventWaitResult {
64 inner: (*self.inner).inner.clone(),
65 waker_key: NULL_WAKER_KEY,
66 terminated: false,
67 }
68 }
69}
7071impl fmt::Debug for Event {
72fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73write!(f, "Event {{ state: {:?} }}", self.inner.inner.lock().state)
74 }
75}
7677/// `Event` state tracking enum.
78#[derive(Copy, Clone, PartialEq, Eq, Debug)]
79enum State {
80/// The `Event` has not yet been signaled. This is the initial state of an `Event`.
81Waiting,
82/// The `signal` method has been called on an `Event`.
83Signaled,
84/// All clones of an `Event` have been dropped without the `signal` method being called. An
85 /// `Event` can never move out of the `Dropped` state.
86Dropped,
87}
8889/// Tracks state shared by all Event clones and futures.
90struct EventState {
91pub state: State,
92pub wakers: Slab<Waker>,
93}
9495/// A handle shared between all `Event` structs for a given event. Once all `Event`s are dropped,
96/// this will be dropped and will notify the `EventState` that it is unreachable by any signalers
97/// and will never be signaled if it hasn't been already.
98struct EventSignaler {
99 inner: Arc<Mutex<EventState>>,
100}
101102impl EventSignaler {
103/// Internal function to set the self.inner.state value if it has not already been set to
104 /// `State::Signaled`. Returns true if this function call changed the value of self.inner.state.
105fn set(&self, state: State) -> bool {
106assert!(state != State::Waiting, "Cannot reset the state to Waiting");
107let mut guard = self.inner.lock();
108if let State::Signaled = guard.state {
109// Avoid double panicking.
110if !std::thread::panicking() {
111assert!(
112 guard.wakers.is_empty(),
113"If there are wakers, a race condition is present"
114);
115 }
116false
117} else {
118let mut wakers = std::mem::replace(&mut guard.wakers, Slab::new());
119 guard.state = state;
120 drop(guard);
121for waker in wakers.drain() {
122 waker.wake();
123 }
124true
125}
126 }
127}
128129impl Drop for EventSignaler {
130fn drop(&mut self) {
131// Indicate that all `Event` clones have been dropped. This does not set the value if it
132 // has already been set to `State::Signaled`.
133let _: bool = self.set(State::Dropped);
134 }
135}
136137/// Future implementation for `Event::wait_or_dropped`.
138#[must_use = "futures do nothing unless polled"]
139pub struct EventWaitResult {
140 inner: Arc<Mutex<EventState>>,
141 waker_key: usize,
142 terminated: bool,
143}
144145impl Future for EventWaitResult {
146type Output = Result<(), Dropped>;
147148fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
149// `this: &mut Self` allows the compiler to track access to individual fields of Self as
150 // distinct borrows.
151let this = self.get_mut();
152let mut guard = this.inner.lock();
153154match guard.state {
155 State::Waiting => {
156let mut new_key = None;
157if this.waker_key == NULL_WAKER_KEY || !guard.wakers.contains(this.waker_key) {
158 new_key = Some(guard.wakers.insert(cx.waker().clone()));
159 } else {
160 guard.wakers[this.waker_key] = cx.waker().clone();
161 }
162163if let Some(key) = new_key {
164 this.waker_key = key;
165 }
166167 Poll::Pending
168 }
169 State::Signaled => {
170 this.terminated = true;
171 this.waker_key = NULL_WAKER_KEY;
172 Poll::Ready(Ok(()))
173 }
174 State::Dropped => {
175 this.terminated = true;
176 this.waker_key = NULL_WAKER_KEY;
177 Poll::Ready(Err(Dropped))
178 }
179 }
180 }
181}
182183impl FusedFuture for EventWaitResult {
184fn is_terminated(&self) -> bool {
185self.terminated
186 }
187}
188189impl Unpin for EventWaitResult {}
190191impl Drop for EventWaitResult {
192fn drop(&mut self) {
193if self.waker_key != NULL_WAKER_KEY {
194// Cleanup the EventWaitResult's waker one is present in the wakers slab.
195let mut guard = self.inner.lock();
196if guard.wakers.contains(self.waker_key) {
197let _ = guard.wakers.remove(self.waker_key);
198 }
199 }
200 }
201}
202203/// Future implementation for `Event::wait`. This future only completes when the event is signaled.
204/// If all signalers are dropped, `EventWait` continues to return `Poll::Pending`.
205#[must_use = "futures do nothing unless polled"]
206pub struct EventWait {
207 inner: EventWaitResult,
208}
209210impl Future for EventWait {
211type Output = ();
212213fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
214match Pin::new(&mut self.inner).poll(cx) {
215 Poll::Ready(Ok(())) => Poll::Ready(()),
216_ => Poll::Pending,
217 }
218 }
219}
220221impl FusedFuture for EventWait {
222fn is_terminated(&self) -> bool {
223self.inner.is_terminated()
224 }
225}
226227impl Unpin for EventWait {}
228229/// Error returned from an `EventWait` when the Event is dropped.
230#[derive(Debug, Eq, PartialEq, Clone, Copy)]
231pub struct Dropped;
232233impl fmt::Display for Dropped {
234fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
235write!(f, "event dropped")
236 }
237}
238239impl std::error::Error for Dropped {}
240241#[cfg(test)]
242mod tests {
243use super::*;
244use fuchsia_async as fasync;
245246// TODO: Add tests to check waker count in EventWait and EventWaitResult.
247248#[test]
249fn signaled_method_respects_signaling() {
250let event = Event::new();
251let event_clone = event.clone();
252253assert!(!event.signaled());
254assert!(!event_clone.signaled());
255256assert!(event.signal());
257258assert!(event.signaled());
259assert!(event_clone.signaled());
260 }
261262#[test]
263fn unsignaled_event_is_pending() {
264let mut ex = fasync::TestExecutor::new();
265266let event = Event::new();
267let mut wait = event.wait();
268let mut wait_or_dropped = event.wait_or_dropped();
269assert!(ex.run_until_stalled(&mut wait).is_pending());
270assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
271 }
272273#[test]
274fn signaled_event_is_ready() {
275let mut ex = fasync::TestExecutor::new();
276277let event = Event::new();
278let mut wait = event.wait();
279let mut wait_or_dropped = event.wait_or_dropped();
280assert!(event.signal());
281assert!(ex.run_until_stalled(&mut wait).is_ready());
282assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
283 }
284285#[test]
286fn event_is_ready_and_wakes_after_stalled() {
287let mut ex = fasync::TestExecutor::new();
288289let event = Event::new();
290let mut wait = event.wait();
291let mut wait_or_dropped = event.wait_or_dropped();
292assert!(ex.run_until_stalled(&mut wait).is_pending());
293assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
294assert!(event.signal());
295assert!(ex.run_until_stalled(&mut wait).is_ready());
296assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
297 }
298299#[test]
300fn signaling_event_registers_and_wakes_multiple_waiters_properly() {
301let mut ex = fasync::TestExecutor::new();
302303let event = Event::new();
304let mut wait_1 = event.wait();
305let mut wait_2 = event.wait();
306let mut wait_3 = event.wait();
307308// Multiple waiters events are pending correctly.
309assert!(ex.run_until_stalled(&mut wait_1).is_pending());
310assert!(ex.run_until_stalled(&mut wait_2).is_pending());
311312assert!(event.signal());
313314// Both previously registered and unregistered event waiters complete correctly.
315assert!(ex.run_until_stalled(&mut wait_1).is_ready());
316assert!(ex.run_until_stalled(&mut wait_2).is_ready());
317assert!(ex.run_until_stalled(&mut wait_3).is_ready());
318 }
319320#[test]
321fn event_is_terminated_after_complete() {
322let mut ex = fasync::TestExecutor::new();
323324let event = Event::new();
325let mut wait = event.wait();
326let mut wait_or_dropped = event.wait_or_dropped();
327assert!(ex.run_until_stalled(&mut wait).is_pending());
328assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
329assert!(!wait.is_terminated());
330assert!(!wait_or_dropped.is_terminated());
331assert!(event.signal());
332assert!(ex.run_until_stalled(&mut wait).is_ready());
333assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
334assert!(wait.is_terminated());
335assert!(wait_or_dropped.is_terminated());
336 }
337338#[test]
339fn waiter_drops_gracefully() {
340let mut ex = fasync::TestExecutor::new();
341342let event = Event::new();
343let mut wait = event.wait();
344let mut wait_or_dropped = event.wait();
345assert!(ex.run_until_stalled(&mut wait).is_pending());
346assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
347assert!(!wait.is_terminated());
348assert!(!wait_or_dropped.is_terminated());
349 drop(wait);
350 drop(wait_or_dropped);
351assert!(event.signal());
352 }
353354#[test]
355fn waiter_completes_after_all_events_drop() {
356let mut ex = fasync::TestExecutor::new();
357358let event = Event::new();
359let event_clone = Event::new();
360let mut wait = event.wait();
361let mut wait_or_dropped = event.wait_or_dropped();
362assert!(ex.run_until_stalled(&mut wait).is_pending());
363assert!(ex.run_until_stalled(&mut wait_or_dropped).is_pending());
364assert!(!wait.is_terminated());
365assert!(!wait_or_dropped.is_terminated());
366 drop(event);
367 drop(event_clone);
368assert!(ex.run_until_stalled(&mut wait).is_pending());
369assert!(ex.run_until_stalled(&mut wait_or_dropped).is_ready());
370 }
371372#[test]
373fn drop_receiver_after_poll_without_event_signal() {
374let mut exec = fasync::TestExecutor::new();
375let event = Event::new();
376let mut waiter = event.wait_or_dropped();
377assert!(exec.run_until_stalled(&mut waiter).is_pending());
378 drop(event);
379 drop(waiter);
380 }
381382#[test]
383fn drop_receiver_after_event_signal_without_repoll() {
384let mut exec = fasync::TestExecutor::new();
385let event = Event::new();
386let mut waiter = event.wait_or_dropped();
387assert_eq!(event.inner.inner.lock().wakers.len(), 0);
388389// Polling the waiter will register a new waker.
390assert!(exec.run_until_stalled(&mut waiter).is_pending());
391assert_eq!(event.inner.inner.lock().wakers.len(), 1);
392393// The waiter's waker is used.
394assert!(event.signal());
395assert_eq!(event.inner.inner.lock().wakers.len(), 0);
396397// Dropping a waiter without polling it is valid.
398drop(waiter);
399 }
400}