1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
use {
fuchsia_sync::RwLock,
futures::{
stream::{iter, FusedStream},
StreamExt,
},
std::{fmt::Debug, pin::Pin, sync::Arc},
};
#[derive(Debug, PartialEq)]
enum StreamStatus {
Halted,
Ready,
}
type SharedAtomicOperationStatus = Arc<RwLock<StreamStatus>>;
// The Token is the sole entity that is allowed to write the operation status. On creation, it
// sets the state to Running. One drop, it sets the state to NotRunning.
#[derive(Debug)]
pub struct Token {
inner: SharedAtomicOperationStatus,
}
impl Token {
fn new(inner: SharedAtomicOperationStatus) -> Self {
*inner.write() = StreamStatus::Halted;
Self { inner }
}
}
impl Drop for Token {
fn drop(&mut self) {
*self.inner.write() = StreamStatus::Ready;
}
}
// Wrap a stream and an overall atomic operation status for work on that stream such that it can
// produce at most one stream item at a time.
//
// The AtomicOneshotStream respects the state that has been set by its latest Token when doling out
// new atomic streams to callers. If the state has been set to Running, the caller is handed back
// a stream that is complete. If the state is NotRunning, a new token is created when the
// underlying stream produces an item. This token can be held by the caller to prevent the
// AtomicOneshotStream from giving out non-complete streams. Once the token is dropped, future
// streams produced by the AtomicOneshotStream will be capable of producing items.
pub struct AtomicOneshotStream<S: FusedStream + Unpin> {
stream: S,
status: SharedAtomicOperationStatus,
}
impl<S> AtomicOneshotStream<S>
where
S: FusedStream + Unpin,
{
pub fn new(stream: S) -> Self {
let status = Arc::new(RwLock::new(StreamStatus::Ready));
Self { stream, status }
}
pub fn get_atomic_oneshot_stream(
&mut self,
) -> Pin<Box<dyn FusedStream<Item = (Token, S::Item)> + '_>> {
// This prevents the AtomicOneshotStream from handing out multiple Tokens.
let s = match *self.status.read() {
StreamStatus::Ready => Some(&mut self.stream),
// If the status indicates that an atomic operation is running, then the stream should
// not be processed.
StreamStatus::Halted => None,
};
// The `map` will never actually run in the event that s is None which prevents the Token
// state from overwriting itself. Rather, the None-case will trigger the fused
// `is_terminated` == true case and the stream will be perceived to be complete.
//
// The new token will never actually be created unless the underlying stream actually
// yields something which prevents the stream from accidentally deadlocking itself. It is
// guaranteed that there will only ever be one token. In order for `s` to be `Some` here,
// the current state must be `Ready` (ie: no token has been instantiated for this stream).
// If the underlying stream produces something, a `Token` instance is created and returned.
// During construction, the `Token` changes the state to `Halted`. A future attempt to
// poll this stream, returns `complete`. Future calls to `get_atomic_oneshot_stream` will
// result in `s` being `None` unless the previously-issued `Token` is dropped.
Box::pin(iter(s).flatten().map(|item| (Token::new(self.status.clone()), item)).fuse())
}
}
#[cfg(test)]
mod tests {
use {
super::*,
fuchsia_async as fasync,
futures::{channel::mpsc, future, select, stream::FuturesUnordered, task::Poll, FutureExt},
std::pin::pin,
wlan_common::assert_variant,
};
#[fuchsia::test]
fn test_atomic_stream() {
let mut exec = fasync::TestExecutor::new();
let (sender, receiver) = mpsc::unbounded();
let mut atomic_stream = AtomicOneshotStream::new(receiver);
// Send a message on the sender and observe that the AtomicOneshotStream can see it immediately.
sender.unbounded_send(()).expect("failed to send test message");
let token = {
let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
assert_variant!(
exec.run_until_stalled(&mut oneshot_stream.next()),
Poll::Ready(Some((token, ()))) => token
)
};
// Now hold onto the token and obseve that nothing can be received on the stream.
sender.unbounded_send(()).expect("failed to send test message");
{
let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
assert_variant!(exec.run_until_stalled(&mut oneshot_stream.next()), Poll::Ready(None),);
}
// Now drop the token and observe that the message comes through.
drop(token);
let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
assert_variant!(
exec.run_until_stalled(&mut oneshot_stream.next()),
Poll::Ready(Some((_, ())))
);
}
#[derive(Debug)]
enum SelectResult {
IntegerFutureReady(u8),
StreamReady,
}
fn select_helper(
exec: &mut fasync::TestExecutor,
fut: Pin<Box<dyn future::Future<Output = u8>>>,
oneshot_stream: &mut Pin<Box<dyn FusedStream<Item = (Token, ())> + '_>>,
) -> Poll<SelectResult> {
let mut futs = FuturesUnordered::new();
futs.push(fut);
let fut = async move {
select! {
integer = futs.select_next_some() => {
return SelectResult::IntegerFutureReady(integer)
},
(_token, ()) = oneshot_stream.select_next_some() => {
return SelectResult::StreamReady
}
}
};
let mut fut = pin!(fut);
exec.run_until_stalled(&mut fut)
}
#[fuchsia::test]
fn test_poll_complete_behavior() {
let mut exec = fasync::TestExecutor::new();
let (sender, receiver) = mpsc::unbounded();
let mut atomic_stream = AtomicOneshotStream::new(receiver);
// Nothing has been send on the sender, so this is polling the underlying stream which
// will produce pending. The ready future will return immediately.
let fut = future::ready(1).boxed();
let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
assert_variant!(
select_helper(&mut exec, fut, &mut oneshot_stream),
Poll::Ready(SelectResult::IntegerFutureReady(1)),
);
// Poll the underlying stream again to demonstrate that this behavior is safe.
let fut = future::ready(2).boxed();
assert_variant!(
select_helper(&mut exec, fut, &mut oneshot_stream),
Poll::Ready(SelectResult::IntegerFutureReady(2)),
);
// Now send something and let the select statement handle the stream.
let fut = future::pending().boxed();
sender.unbounded_send(()).expect("failed to send test message");
assert_variant!(
select_helper(&mut exec, fut, &mut oneshot_stream),
Poll::Ready(SelectResult::StreamReady),
);
// Poll a pending future and the now-complete stream to demonstrate that it is safe to poll
// again.
let fut = future::pending().boxed();
assert_variant!(select_helper(&mut exec, fut, &mut oneshot_stream), Poll::Pending,);
}
#[fuchsia::test]
fn test_token_sets_state() {
let _exec = fasync::TestExecutor::new();
let status = Arc::new(RwLock::new(StreamStatus::Ready));
// Creating the token should update the state to Running.
let token = Token::new(status.clone());
assert_eq!(*status.clone().read(), StreamStatus::Halted);
// Dropping the token should reset the state to NotRunning.
drop(token);
assert_eq!(*status.clone().read(), StreamStatus::Ready)
}
#[fuchsia::test]
fn test_operating_state() {
let mut exec = fasync::TestExecutor::new();
let (sender, receiver) = mpsc::unbounded();
let mut atomic_stream = AtomicOneshotStream::new(receiver);
let status = atomic_stream.status.clone();
// Verify that the atomic stream starts out in the NotRunning state.
assert_eq!(*atomic_stream.status.read(), StreamStatus::Ready);
// Create a oneshot stream and verify the status is still NotRunning.
{
let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
assert_eq!(*status.read(), StreamStatus::Ready);
// After learning that there is no event in the stream, the state should still be
// NotRunning.
assert_variant!(exec.run_until_stalled(&mut oneshot_stream.next()), Poll::Pending,);
assert_eq!(*status.read(), StreamStatus::Ready);
}
// Send a message on the sender and observe what happens.
sender.unbounded_send(()).expect("failed to send message");
{
// Initially the state is still NotRunning.
let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
assert_eq!(*status.read(), StreamStatus::Ready);
// But once the item is actually received, the state should change.
let token = assert_variant!(
exec.run_until_stalled(&mut oneshot_stream.next()),
Poll::Ready(Some((token, ()))) => token
);
assert_eq!(*status.read(), StreamStatus::Halted);
// Now drop the token and observe that the state goes back to NotRunning.
drop(token);
assert_eq!(*status.read(), StreamStatus::Ready);
}
}
}