fuchsia_bluetooth/expectation/asynchronous.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
//! This module defines a framework for using the crate::expectations module in
//! asynchronous situations, where you wish you detect the satisfiability of the
//! expectations at some undetermined point in the future.
//!
//! An `ExpectationFuture` implements `Future` and will complete when the
//! underlying state satisfies the predicate. Failure can be detected by wrapping
//! the future in a timeout, as is implemented by the method `when_satisfied`.
//!
//! To use `ExpectationFuture`s, you must implement `ExpectableState` for the
//! type of state you wish to track, which defines how the state will update and
//! notify tasks that are waiting on state changes.
//!
//! A common pattern is to await the expectation of a given state, and then
//! check further predicate expectations to determine success or failure at that
//! point in time.
//!
//! e.g.
//!
//! ```ignore
//! // Wait for the action to have completed one way or the other
//! let state = state.when_satisfied(action_complete, timeout).await?;
//! // Then check that the action completed successfully
//! action_success.satisfied(state)
//! ```
use anyhow::{format_err, Error};
use fuchsia_async::{DurationExt, TimeoutExt};
use fuchsia_sync::{MappedRwLockWriteGuard, RwLock, RwLockWriteGuard};
use futures::future::BoxFuture;
use futures::FutureExt;
use slab::Slab;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};
use crate::expectation::Predicate;
/// Future that completes once a Predicate is satisfied for the `T::State` type
/// where `T` is some type that allows monitoring of State updates
#[must_use = "futures do nothing unless polled"]
pub struct ExpectationFuture<T: ExpectableState> {
state: T,
expectation: Predicate<T::State>,
waker_key: Option<usize>,
}
impl<T: ExpectableState> ExpectationFuture<T> {
fn new(state: T, expectation: Predicate<T::State>) -> ExpectationFuture<T> {
ExpectationFuture { state, expectation, waker_key: None }
}
fn clear_waker(&mut self) {
if let Some(key) = self.waker_key {
self.state.remove_task(key);
self.waker_key = None;
}
}
fn store_task(&mut self, cx: &mut task::Context<'_>) {
let key = self.state.store_task(cx);
self.waker_key = Some(key);
}
}
impl<T: ExpectableState> std::marker::Unpin for ExpectationFuture<T> {}
impl<T: ExpectableState> Future for ExpectationFuture<T> {
type Output = T::State;
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
self.clear_waker();
let state = self.state.read();
if self.expectation.satisfied(&state) {
Poll::Ready(state)
} else {
self.store_task(cx);
Poll::Pending
}
}
}
/// Trait for objects that allow futures to trigger based on state changes
///
/// This trait will commonly be used via the `ExpectableStateExt` extension
/// trait which provides the convenient `when_satisfied` function.
///
/// You can implement this trait for your own types which implement their own
/// state tracking and notification of futures, or you can use the
/// `ExpectationHarness` struct in this module which provides a ready to use
/// implementation.
pub trait ExpectableState: Clone {
/// Type of current state we are tracking
type State: 'static;
/// Register a task as needing waking when state changes
fn store_task(&mut self, cx: &mut task::Context<'_>) -> usize;
/// Remove a task from being tracked. Called by `ExpectationFuture` when
/// polled.
fn remove_task(&mut self, key: usize);
/// Notify all pending tasks that state has changed
fn notify_state_changed(&self);
/// Read a snapshot of the current State
fn read(&self) -> Self::State;
}
pub trait ExpectableStateExt: ExpectableState + Sized {
/// Convenience method for awaiting expectations on the underlying state
/// Provides a simple syntax for asynchronously awaiting expectations:
///
/// ```ignore
/// // Wait for the action to have completed one way or the other
/// let state = state.when_satisfied(action_complete, timeout).await?;
/// ```
fn when_satisfied(
&self,
expectation: Predicate<Self::State>,
timeout: zx::MonotonicDuration,
) -> BoxFuture<'_, Result<Self::State, Error>>;
}
impl<T: ExpectableState + Sized> ExpectableStateExt for T
where
T: Send + Sync + 'static,
T::State: Send + Sync + 'static,
{
fn when_satisfied(
&self,
expectation: Predicate<T::State>,
timeout: zx::MonotonicDuration,
) -> BoxFuture<'_, Result<Self::State, Error>> {
let state = self.clone();
let exp = expectation.clone();
ExpectationFuture::new(self.clone(), expectation)
.map(|s| Ok(s))
.on_timeout(timeout.after_now(), move || {
let state = state.read();
let result = exp.assert_satisfied(&state);
result.map(|_| state).map_err(|err| {
format_err!("Timed out waiting for expectation, last result:\n{:?}", err)
})
})
.boxed()
}
}
/// Inner state for the `Expectable` helper type
pub struct ExpectableInner<S, A> {
// Current state
pub state: S,
// Pending Tasks
tasks: Slab<task::Waker>,
// Auxillary shared data
pub aux: A,
}
/// `Expectable<S,A>` is an easy way to build an implementation of `ExpectableState` to await upon.
/// The Aux type `A` is commonly used to hold a FIDL Proxy to drive the behavior under test.
pub type Expectable<S, A> = Arc<RwLock<ExpectableInner<S, A>>>;
pub fn expectable<S, A>(state: S, aux: A) -> Expectable<S, A> {
Arc::new(RwLock::new(ExpectableInner { state, tasks: Slab::new(), aux }))
}
impl<S: Clone + 'static, A> ExpectableState for Expectable<S, A> {
type State = S;
/// Register a task as needing waking when state changes
fn store_task(&mut self, cx: &mut task::Context<'_>) -> usize {
self.write().tasks.insert(cx.waker().clone())
}
/// Remove a task from being tracked
fn remove_task(&mut self, key: usize) {
let mut harness = self.write();
if harness.tasks.contains(key) {
drop(harness.tasks.remove(key));
}
}
/// Notify all pending tasks that state has changed
fn notify_state_changed(&self) {
for task in &RwLock::read(self).tasks {
task.1.wake_by_ref();
}
self.write().tasks.clear()
}
fn read(&self) -> Self::State {
RwLock::read(self).state.clone()
}
}
/// A trait to provide convenience methods on Expectable types held within wrapping harnesses
pub trait ExpectableExt<S, A> {
/// Mutable access the auxilliary data
fn aux(&self) -> MappedRwLockWriteGuard<'_, A>;
/// Mutable access to the state
fn write_state(&self) -> MappedRwLockWriteGuard<'_, S>;
}
// All `Expectable<S,A>` provide these methods, and thus so do any types implementing `Deref` whose
// targets are `Expectable<S,A>`
impl<S, A> ExpectableExt<S, A> for Expectable<S, A> {
fn aux(&self) -> MappedRwLockWriteGuard<'_, A> {
RwLockWriteGuard::map(self.write(), |harness| &mut harness.aux)
}
fn write_state(&self) -> MappedRwLockWriteGuard<'_, S> {
RwLockWriteGuard::map(self.write(), |harness| &mut harness.state)
}
}