fuchsia_bluetooth/expectation/
asynchronous.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
// Copyright 2019 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

//! This module defines a framework for using the crate::expectations module in
//! asynchronous situations, where you wish you detect the satisfiability of the
//! expectations at some undetermined point in the future.
//!
//! An `ExpectationFuture` implements `Future` and will complete when the
//! underlying state satisfies the predicate. Failure can be detected by wrapping
//! the future in a timeout, as is implemented by the method `when_satisfied`.
//!
//! To use `ExpectationFuture`s, you must implement `ExpectableState` for the
//! type of state you wish to track, which defines how the state will update and
//! notify tasks that are waiting on state changes.
//!
//! A common pattern is to await the expectation of a given state, and then
//! check further predicate expectations to determine success or failure at that
//! point in time.
//!
//! e.g.
//!
//!   ```ignore
//!   // Wait for the action to have completed one way or the other
//!   let state = state.when_satisfied(action_complete, timeout).await?;
//!   // Then check that the action completed successfully
//!   action_success.satisfied(state)
//!   ```

use anyhow::{format_err, Error};
use fuchsia_async::{DurationExt, TimeoutExt};
use fuchsia_sync::{MappedRwLockWriteGuard, RwLock, RwLockWriteGuard};

use futures::future::BoxFuture;
use futures::FutureExt;
use slab::Slab;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};

use crate::expectation::Predicate;

/// Future that completes once a Predicate is satisfied for the `T::State` type
/// where `T` is some type that allows monitoring of State updates
#[must_use = "futures do nothing unless polled"]
pub struct ExpectationFuture<T: ExpectableState> {
    state: T,
    expectation: Predicate<T::State>,
    waker_key: Option<usize>,
}

impl<T: ExpectableState> ExpectationFuture<T> {
    fn new(state: T, expectation: Predicate<T::State>) -> ExpectationFuture<T> {
        ExpectationFuture { state, expectation, waker_key: None }
    }

    fn clear_waker(&mut self) {
        if let Some(key) = self.waker_key {
            self.state.remove_task(key);
            self.waker_key = None;
        }
    }

    fn store_task(&mut self, cx: &mut task::Context<'_>) {
        let key = self.state.store_task(cx);
        self.waker_key = Some(key);
    }
}

impl<T: ExpectableState> std::marker::Unpin for ExpectationFuture<T> {}

impl<T: ExpectableState> Future for ExpectationFuture<T> {
    type Output = T::State;

    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
        self.clear_waker();
        let state = self.state.read();
        if self.expectation.satisfied(&state) {
            Poll::Ready(state)
        } else {
            self.store_task(cx);
            Poll::Pending
        }
    }
}

/// Trait for objects that allow futures to trigger based on state changes
///
/// This trait will commonly be used via the `ExpectableStateExt` extension
/// trait which provides the convenient `when_satisfied` function.
///
/// You can implement this trait for your own types which implement their own
/// state tracking and notification of futures, or you can use the
/// `ExpectationHarness` struct in this module which provides a ready to use
/// implementation.
pub trait ExpectableState: Clone {
    /// Type of current state we are tracking
    type State: 'static;

    /// Register a task as needing waking when state changes
    fn store_task(&mut self, cx: &mut task::Context<'_>) -> usize;

    /// Remove a task from being tracked. Called by `ExpectationFuture` when
    /// polled.
    fn remove_task(&mut self, key: usize);

    /// Notify all pending tasks that state has changed
    fn notify_state_changed(&self);

    /// Read a snapshot of the current State
    fn read(&self) -> Self::State;
}

pub trait ExpectableStateExt: ExpectableState + Sized {
    /// Convenience method for awaiting expectations on the underlying state
    /// Provides a simple syntax for asynchronously awaiting expectations:
    ///
    ///  ```ignore
    ///    // Wait for the action to have completed one way or the other
    ///    let state = state.when_satisfied(action_complete, timeout).await?;
    ///  ```
    fn when_satisfied(
        &self,
        expectation: Predicate<Self::State>,
        timeout: zx::MonotonicDuration,
    ) -> BoxFuture<'_, Result<Self::State, Error>>;
}

impl<T: ExpectableState + Sized> ExpectableStateExt for T
where
    T: Send + Sync + 'static,
    T::State: Send + Sync + 'static,
{
    fn when_satisfied(
        &self,
        expectation: Predicate<T::State>,
        timeout: zx::MonotonicDuration,
    ) -> BoxFuture<'_, Result<Self::State, Error>> {
        let state = self.clone();
        let exp = expectation.clone();
        ExpectationFuture::new(self.clone(), expectation)
            .map(|s| Ok(s))
            .on_timeout(timeout.after_now(), move || {
                let state = state.read();
                let result = exp.assert_satisfied(&state);
                result.map(|_| state).map_err(|err| {
                    format_err!("Timed out waiting for expectation, last result:\n{:?}", err)
                })
            })
            .boxed()
    }
}

/// Inner state for the `Expectable` helper type
pub struct ExpectableInner<S, A> {
    // Current state
    pub state: S,

    // Pending Tasks
    tasks: Slab<task::Waker>,

    // Auxillary shared data
    pub aux: A,
}

/// `Expectable<S,A>` is an easy way to build an implementation of `ExpectableState` to await upon.
/// The Aux type `A` is commonly used to hold a FIDL Proxy to drive the behavior under test.
pub type Expectable<S, A> = Arc<RwLock<ExpectableInner<S, A>>>;

pub fn expectable<S, A>(state: S, aux: A) -> Expectable<S, A> {
    Arc::new(RwLock::new(ExpectableInner { state, tasks: Slab::new(), aux }))
}

impl<S: Clone + 'static, A> ExpectableState for Expectable<S, A> {
    type State = S;

    /// Register a task as needing waking when state changes
    fn store_task(&mut self, cx: &mut task::Context<'_>) -> usize {
        self.write().tasks.insert(cx.waker().clone())
    }

    /// Remove a task from being tracked
    fn remove_task(&mut self, key: usize) {
        let mut harness = self.write();
        if harness.tasks.contains(key) {
            drop(harness.tasks.remove(key));
        }
    }

    /// Notify all pending tasks that state has changed
    fn notify_state_changed(&self) {
        for task in &RwLock::read(self).tasks {
            task.1.wake_by_ref();
        }
        self.write().tasks.clear()
    }

    fn read(&self) -> Self::State {
        RwLock::read(self).state.clone()
    }
}

/// A trait to provide convenience methods on Expectable types held within wrapping harnesses
pub trait ExpectableExt<S, A> {
    /// Mutable access the auxilliary data
    fn aux(&self) -> MappedRwLockWriteGuard<'_, A>;

    /// Mutable access to the state
    fn write_state(&self) -> MappedRwLockWriteGuard<'_, S>;
}

// All `Expectable<S,A>` provide these methods, and thus so do any types implementing `Deref` whose
// targets are `Expectable<S,A>`
impl<S, A> ExpectableExt<S, A> for Expectable<S, A> {
    fn aux(&self) -> MappedRwLockWriteGuard<'_, A> {
        RwLockWriteGuard::map(self.write(), |harness| &mut harness.aux)
    }

    fn write_state(&self) -> MappedRwLockWriteGuard<'_, S> {
        RwLockWriteGuard::map(self.write(), |harness| &mut harness.state)
    }
}