futures_util/stream/futures_unordered/
iter.rs

1use super::task::Task;
2use super::FuturesUnordered;
3use core::marker::PhantomData;
4use core::pin::Pin;
5use core::ptr;
6use core::sync::atomic::Ordering::Relaxed;
7
8/// Mutable iterator over all futures in the unordered set.
9#[derive(Debug)]
10pub struct IterPinMut<'a, Fut> {
11    pub(super) task: *const Task<Fut>,
12    pub(super) len: usize,
13    pub(super) _marker: PhantomData<&'a mut FuturesUnordered<Fut>>,
14}
15
16/// Mutable iterator over all futures in the unordered set.
17#[derive(Debug)]
18pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>);
19
20/// Immutable iterator over all futures in the unordered set.
21#[derive(Debug)]
22pub struct IterPinRef<'a, Fut> {
23    pub(super) task: *const Task<Fut>,
24    pub(super) len: usize,
25    pub(super) pending_next_all: *mut Task<Fut>,
26    pub(super) _marker: PhantomData<&'a FuturesUnordered<Fut>>,
27}
28
29/// Immutable iterator over all the futures in the unordered set.
30#[derive(Debug)]
31pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>);
32
33/// Owned iterator over all futures in the unordered set.
34#[derive(Debug)]
35pub struct IntoIter<Fut: Unpin> {
36    pub(super) len: usize,
37    pub(super) inner: FuturesUnordered<Fut>,
38}
39
40impl<Fut: Unpin> Iterator for IntoIter<Fut> {
41    type Item = Fut;
42
43    fn next(&mut self) -> Option<Self::Item> {
44        // `head_all` can be accessed directly and we don't need to spin on
45        // `Task::next_all` since we have exclusive access to the set.
46        let task = self.inner.head_all.get_mut();
47
48        if (*task).is_null() {
49            return None;
50        }
51
52        unsafe {
53            // Moving out of the future is safe because it is `Unpin`
54            let future = (*(**task).future.get()).take().unwrap();
55
56            // Mutable access to a previously shared `FuturesUnordered` implies
57            // that the other threads already released the object before the
58            // current thread acquired it, so relaxed ordering can be used and
59            // valid `next_all` checks can be skipped.
60            let next = (**task).next_all.load(Relaxed);
61            *task = next;
62            if !task.is_null() {
63                *(**task).prev_all.get() = ptr::null_mut();
64            }
65            self.len -= 1;
66            Some(future)
67        }
68    }
69
70    fn size_hint(&self) -> (usize, Option<usize>) {
71        (self.len, Some(self.len))
72    }
73}
74
75impl<Fut: Unpin> ExactSizeIterator for IntoIter<Fut> {}
76
77impl<'a, Fut> Iterator for IterPinMut<'a, Fut> {
78    type Item = Pin<&'a mut Fut>;
79
80    fn next(&mut self) -> Option<Self::Item> {
81        if self.task.is_null() {
82            return None;
83        }
84
85        unsafe {
86            let future = (*(*self.task).future.get()).as_mut().unwrap();
87
88            // Mutable access to a previously shared `FuturesUnordered` implies
89            // that the other threads already released the object before the
90            // current thread acquired it, so relaxed ordering can be used and
91            // valid `next_all` checks can be skipped.
92            let next = (*self.task).next_all.load(Relaxed);
93            self.task = next;
94            self.len -= 1;
95            Some(Pin::new_unchecked(future))
96        }
97    }
98
99    fn size_hint(&self) -> (usize, Option<usize>) {
100        (self.len, Some(self.len))
101    }
102}
103
104impl<Fut> ExactSizeIterator for IterPinMut<'_, Fut> {}
105
106impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> {
107    type Item = &'a mut Fut;
108
109    fn next(&mut self) -> Option<Self::Item> {
110        self.0.next().map(Pin::get_mut)
111    }
112
113    fn size_hint(&self) -> (usize, Option<usize>) {
114        self.0.size_hint()
115    }
116}
117
118impl<Fut: Unpin> ExactSizeIterator for IterMut<'_, Fut> {}
119
120impl<'a, Fut> Iterator for IterPinRef<'a, Fut> {
121    type Item = Pin<&'a Fut>;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        if self.task.is_null() {
125            return None;
126        }
127
128        unsafe {
129            let future = (*(*self.task).future.get()).as_ref().unwrap();
130
131            // Relaxed ordering can be used since acquire ordering when
132            // `head_all` was initially read for this iterator implies acquire
133            // ordering for all previously inserted nodes (and we don't need to
134            // read `len_all` again for any other nodes).
135            let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed);
136            self.task = next;
137            self.len -= 1;
138            Some(Pin::new_unchecked(future))
139        }
140    }
141
142    fn size_hint(&self) -> (usize, Option<usize>) {
143        (self.len, Some(self.len))
144    }
145}
146
147impl<Fut> ExactSizeIterator for IterPinRef<'_, Fut> {}
148
149impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> {
150    type Item = &'a Fut;
151
152    fn next(&mut self) -> Option<Self::Item> {
153        self.0.next().map(Pin::get_ref)
154    }
155
156    fn size_hint(&self) -> (usize, Option<usize>) {
157        self.0.size_hint()
158    }
159}
160
161impl<Fut: Unpin> ExactSizeIterator for Iter<'_, Fut> {}
162
163// SAFETY: we do nothing thread-local and there is no interior mutability,
164// so the usual structural `Send`/`Sync` apply.
165unsafe impl<Fut: Send> Send for IterPinRef<'_, Fut> {}
166unsafe impl<Fut: Sync> Sync for IterPinRef<'_, Fut> {}
167
168unsafe impl<Fut: Send> Send for IterPinMut<'_, Fut> {}
169unsafe impl<Fut: Sync> Sync for IterPinMut<'_, Fut> {}
170
171unsafe impl<Fut: Send + Unpin> Send for IntoIter<Fut> {}
172unsafe impl<Fut: Sync + Unpin> Sync for IntoIter<Fut> {}