1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
// Copyright 2022 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

use {
    fuchsia_sync::RwLock,
    futures::{
        stream::{iter, FusedStream},
        StreamExt,
    },
    std::{fmt::Debug, pin::Pin, sync::Arc},
};

#[derive(Debug, PartialEq)]
enum StreamStatus {
    Halted,
    Ready,
}

type SharedAtomicOperationStatus = Arc<RwLock<StreamStatus>>;

// The Token is the sole entity that is allowed to write the operation status.  On creation, it
// sets the state to Running.  One drop, it sets the state to NotRunning.
#[derive(Debug)]
pub struct Token {
    inner: SharedAtomicOperationStatus,
}

impl Token {
    fn new(inner: SharedAtomicOperationStatus) -> Self {
        *inner.write() = StreamStatus::Halted;
        Self { inner }
    }
}

impl Drop for Token {
    fn drop(&mut self) {
        *self.inner.write() = StreamStatus::Ready;
    }
}

// Wrap a stream and an overall atomic operation status for work on that stream such that it can
// produce at most one stream item at a time.
//
// The AtomicOneshotStream respects the state that has been set by its latest Token when doling out
// new atomic streams to callers.  If the state has been set to Running, the caller is handed back
// a stream that is complete.  If the state is NotRunning, a new token is created when the
// underlying stream produces an item.  This token can be held by the caller to prevent the
// AtomicOneshotStream from giving out non-complete streams.  Once the token is dropped, future
// streams produced by the AtomicOneshotStream will be capable of producing items.
pub struct AtomicOneshotStream<S: FusedStream + Unpin> {
    stream: S,
    status: SharedAtomicOperationStatus,
}

impl<S> AtomicOneshotStream<S>
where
    S: FusedStream + Unpin,
{
    pub fn new(stream: S) -> Self {
        let status = Arc::new(RwLock::new(StreamStatus::Ready));
        Self { stream, status }
    }

    pub fn get_atomic_oneshot_stream(
        &mut self,
    ) -> Pin<Box<dyn FusedStream<Item = (Token, S::Item)> + '_>> {
        // This prevents the AtomicOneshotStream from handing out multiple Tokens.
        let s = match *self.status.read() {
            StreamStatus::Ready => Some(&mut self.stream),
            // If the status indicates that an atomic operation is running, then the stream should
            // not be processed.
            StreamStatus::Halted => None,
        };

        // The `map` will never actually run in the event that s is None which prevents the Token
        // state from overwriting itself.  Rather, the None-case will trigger the fused
        // `is_terminated` == true case and the stream will be perceived to be complete.
        //
        // The new token will never actually be created unless the underlying stream actually
        // yields something which prevents the stream from accidentally deadlocking itself.  It is
        // guaranteed that there will only ever be one token.  In order for `s` to  be `Some` here,
        // the current state must be `Ready` (ie: no token has been instantiated for this stream).
        // If the underlying stream produces something, a `Token` instance is created and returned.
        // During construction, the `Token` changes the state to `Halted`.  A future attempt to
        // poll this stream, returns `complete`.  Future calls to `get_atomic_oneshot_stream` will
        // result in `s` being `None` unless the previously-issued `Token` is dropped.
        Box::pin(iter(s).flatten().map(|item| (Token::new(self.status.clone()), item)).fuse())
    }
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        fuchsia_async as fasync,
        futures::{channel::mpsc, future, select, stream::FuturesUnordered, task::Poll, FutureExt},
        std::pin::pin,
        wlan_common::assert_variant,
    };

    #[fuchsia::test]
    fn test_atomic_stream() {
        let mut exec = fasync::TestExecutor::new();
        let (sender, receiver) = mpsc::unbounded();
        let mut atomic_stream = AtomicOneshotStream::new(receiver);

        // Send a message on the sender and observe that the AtomicOneshotStream can see it immediately.
        sender.unbounded_send(()).expect("failed to send test message");
        let token = {
            let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
            assert_variant!(
                exec.run_until_stalled(&mut oneshot_stream.next()),
                Poll::Ready(Some((token, ()))) => token
            )
        };

        // Now hold onto the token and obseve that nothing can be received on the stream.
        sender.unbounded_send(()).expect("failed to send test message");
        {
            let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
            assert_variant!(exec.run_until_stalled(&mut oneshot_stream.next()), Poll::Ready(None),);
        }

        // Now drop the token and observe that the message comes through.
        drop(token);
        let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
        assert_variant!(
            exec.run_until_stalled(&mut oneshot_stream.next()),
            Poll::Ready(Some((_, ())))
        );
    }

    #[derive(Debug)]
    enum SelectResult {
        IntegerFutureReady(u8),
        StreamReady,
    }

    fn select_helper(
        exec: &mut fasync::TestExecutor,
        fut: Pin<Box<dyn future::Future<Output = u8>>>,
        oneshot_stream: &mut Pin<Box<dyn FusedStream<Item = (Token, ())> + '_>>,
    ) -> Poll<SelectResult> {
        let mut futs = FuturesUnordered::new();
        futs.push(fut);

        let fut = async move {
            select! {
                integer = futs.select_next_some() => {
                    return SelectResult::IntegerFutureReady(integer)
                },
                (_token, ()) = oneshot_stream.select_next_some() => {
                    return SelectResult::StreamReady
                }
            }
        };

        let mut fut = pin!(fut);
        exec.run_until_stalled(&mut fut)
    }

    #[fuchsia::test]
    fn test_poll_complete_behavior() {
        let mut exec = fasync::TestExecutor::new();
        let (sender, receiver) = mpsc::unbounded();
        let mut atomic_stream = AtomicOneshotStream::new(receiver);

        // Nothing has been send on the sender, so this is polling the underlying stream which
        // will produce pending.  The ready future will return immediately.
        let fut = future::ready(1).boxed();
        let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
        assert_variant!(
            select_helper(&mut exec, fut, &mut oneshot_stream),
            Poll::Ready(SelectResult::IntegerFutureReady(1)),
        );

        // Poll the underlying stream again to demonstrate that this behavior is safe.
        let fut = future::ready(2).boxed();
        assert_variant!(
            select_helper(&mut exec, fut, &mut oneshot_stream),
            Poll::Ready(SelectResult::IntegerFutureReady(2)),
        );

        // Now send something and let the select statement handle the stream.
        let fut = future::pending().boxed();
        sender.unbounded_send(()).expect("failed to send test message");
        assert_variant!(
            select_helper(&mut exec, fut, &mut oneshot_stream),
            Poll::Ready(SelectResult::StreamReady),
        );

        // Poll a pending future and the now-complete stream to demonstrate that it is safe to poll
        // again.
        let fut = future::pending().boxed();
        assert_variant!(select_helper(&mut exec, fut, &mut oneshot_stream), Poll::Pending,);
    }

    #[fuchsia::test]
    fn test_token_sets_state() {
        let _exec = fasync::TestExecutor::new();

        let status = Arc::new(RwLock::new(StreamStatus::Ready));

        // Creating the token should update the state to Running.
        let token = Token::new(status.clone());
        assert_eq!(*status.clone().read(), StreamStatus::Halted);

        // Dropping the token should reset the state to NotRunning.
        drop(token);
        assert_eq!(*status.clone().read(), StreamStatus::Ready)
    }

    #[fuchsia::test]
    fn test_operating_state() {
        let mut exec = fasync::TestExecutor::new();

        let (sender, receiver) = mpsc::unbounded();
        let mut atomic_stream = AtomicOneshotStream::new(receiver);
        let status = atomic_stream.status.clone();

        // Verify that the atomic stream starts out in the NotRunning state.
        assert_eq!(*atomic_stream.status.read(), StreamStatus::Ready);

        // Create a oneshot stream and verify the status is still NotRunning.
        {
            let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
            assert_eq!(*status.read(), StreamStatus::Ready);

            // After learning that there is no event in the stream, the state should still be
            // NotRunning.
            assert_variant!(exec.run_until_stalled(&mut oneshot_stream.next()), Poll::Pending,);
            assert_eq!(*status.read(), StreamStatus::Ready);
        }

        // Send a message on the sender and observe what happens.
        sender.unbounded_send(()).expect("failed to send message");
        {
            // Initially the state is still NotRunning.
            let mut oneshot_stream = atomic_stream.get_atomic_oneshot_stream();
            assert_eq!(*status.read(), StreamStatus::Ready);

            // But once the item is actually received, the state should change.
            let token = assert_variant!(
                exec.run_until_stalled(&mut oneshot_stream.next()),
                Poll::Ready(Some((token, ()))) => token
            );
            assert_eq!(*status.read(), StreamStatus::Halted);

            // Now drop the token and observe that the state goes back to NotRunning.
            drop(token);
            assert_eq!(*status.read(), StreamStatus::Ready);
        }
    }
}