event_queue/
barrier.rs
1use futures::channel::mpsc;
6use futures::never::Never;
7use futures::task::{Context, Poll};
8use futures::{Future, Stream};
9use std::pin::Pin;
10
11#[derive(Debug)]
13pub struct Barrier(mpsc::Receiver<Never>);
14
15#[derive(Debug, Clone)]
17pub struct BarrierBlock(#[expect(dead_code)] mpsc::Sender<Never>);
18
19impl Barrier {
20 pub fn new() -> (Self, BarrierBlock) {
24 let (send, recv) = mpsc::channel(0);
25 (Self(recv), BarrierBlock(send))
26 }
27}
28
29impl Future for Barrier {
30 type Output = ();
31
32 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
33 let recv = Pin::new(&mut self.get_mut().0);
34 recv.poll_next(cx).map(|_| ())
35 }
36}
37
38#[cfg(test)]
39mod tests {
40 use super::*;
41 use futures::prelude::*;
42
43 #[test]
44 fn drop_single_block_unblocks_barrier() {
45 let (barrier, _) = Barrier::new();
46 assert_eq!(barrier.now_or_never(), Some(()));
47 }
48
49 #[test]
50 fn single_block_blocks_barrier() {
51 let mut executor = fuchsia_async::TestExecutor::new();
52
53 let (mut barrier, block) = Barrier::new();
54 assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Pending);
55
56 drop(block);
57 assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Ready(()));
58 }
59
60 #[test]
61 fn block_clone_blocks_barrier() {
62 let mut executor = fuchsia_async::TestExecutor::new();
63
64 let (mut barrier, block) = Barrier::new();
65 let block_clone = block.clone();
66 assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Pending);
67
68 drop(block);
69 assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Pending);
70
71 drop(block_clone);
72 assert_eq!(executor.run_until_stalled(&mut barrier), Poll::Ready(()));
73 }
74}