event_queue/
barrier.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use futures::channel::mpsc;
6use futures::never::Never;
7use futures::task::{Context, Poll};
8use futures::{Future, Stream};
9use std::pin::Pin;
10
11/// A barrier allows an async task to wait for all blocks to be dropped.
12#[derive(Debug)]
13pub struct Barrier(mpsc::Receiver<Never>);
14
15/// Any clone of a barrier block prevents the associated [`Barrier`] future from completing.
16#[derive(Debug, Clone)]
17pub struct BarrierBlock(#[expect(dead_code)] mpsc::Sender<Never>);
18
19impl Barrier {
20    /// Creates a new barrier and associated blocker.
21    ///
22    /// The future that is [`Barrier`] resolves when all clones of [`BarrierBlock`] are dropped.
23    pub fn new() -> (Self, BarrierBlock) {
24        let (send, recv) = mpsc::channel(0);
25        (Self(recv), BarrierBlock(send))
26    }
27}
28
29impl Future for Barrier {
30    type Output = ();
31
32    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33        let recv = Pin::new(&mut self.get_mut().0);
34        recv.poll_next(cx).map(|_| ())
35    }
36}
37
38#[cfg(test)]
39mod tests {
40    use super::*;
41    use futures::prelude::*;
42
43    #[test]
44    fn drop_single_block_unblocks_barrier() {
45        let (barrier, _) = Barrier::new();
46        assert_eq!(barrier.now_or_never(), Some(()));
47    }
48
49    #[test]
50    fn single_block_blocks_barrier() {
51        let mut executor = fuchsia_async::TestExecutor::new();
52
53        let (mut barrier, block) = Barrier::new();
54        assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Pending);
55
56        drop(block);
57        assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Ready(()));
58    }
59
60    #[test]
61    fn block_clone_blocks_barrier() {
62        let mut executor = fuchsia_async::TestExecutor::new();
63
64        let (mut barrier, block) = Barrier::new();
65        let block_clone = block.clone();
66        assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Pending);
67
68        drop(block);
69        assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Pending);
70
71        drop(block_clone);
72        assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Ready(()));
73    }
74}