crossbeam_utils/
backoff.rs

1use crate::primitive::sync::atomic;
2use core::cell::Cell;
3use core::fmt;
4
5const SPIN_LIMIT: u32 = 6;
6const YIELD_LIMIT: u32 = 10;
7
8/// Performs exponential backoff in spin loops.
9///
10/// Backing off in spin loops reduces contention and improves overall performance.
11///
12/// This primitive can execute *YIELD* and *PAUSE* instructions, yield the current thread to the OS
13/// scheduler, and tell when is a good time to block the thread using a different synchronization
14/// mechanism. Each step of the back off procedure takes roughly twice as long as the previous
15/// step.
16///
17/// # Examples
18///
19/// Backing off in a lock-free loop:
20///
21/// ```
22/// use crossbeam_utils::Backoff;
23/// use std::sync::atomic::AtomicUsize;
24/// use std::sync::atomic::Ordering::SeqCst;
25///
26/// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize {
27///     let backoff = Backoff::new();
28///     loop {
29///         let val = a.load(SeqCst);
30///         if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() {
31///             return val;
32///         }
33///         backoff.spin();
34///     }
35/// }
36/// ```
37///
38/// Waiting for an [`AtomicBool`] to become `true`:
39///
40/// ```
41/// use crossbeam_utils::Backoff;
42/// use std::sync::atomic::AtomicBool;
43/// use std::sync::atomic::Ordering::SeqCst;
44///
45/// fn spin_wait(ready: &AtomicBool) {
46///     let backoff = Backoff::new();
47///     while !ready.load(SeqCst) {
48///         backoff.snooze();
49///     }
50/// }
51/// ```
52///
53/// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait.
54/// Note that whoever sets the atomic variable to `true` must notify the parked thread by calling
55/// [`unpark()`]:
56///
57/// ```
58/// use crossbeam_utils::Backoff;
59/// use std::sync::atomic::AtomicBool;
60/// use std::sync::atomic::Ordering::SeqCst;
61/// use std::thread;
62///
63/// fn blocking_wait(ready: &AtomicBool) {
64///     let backoff = Backoff::new();
65///     while !ready.load(SeqCst) {
66///         if backoff.is_completed() {
67///             thread::park();
68///         } else {
69///             backoff.snooze();
70///         }
71///     }
72/// }
73/// ```
74///
75/// [`is_completed`]: Backoff::is_completed
76/// [`std::thread::park()`]: std::thread::park
77/// [`Condvar`]: std::sync::Condvar
78/// [`AtomicBool`]: std::sync::atomic::AtomicBool
79/// [`unpark()`]: std::thread::Thread::unpark
80pub struct Backoff {
81    step: Cell<u32>,
82}
83
84impl Backoff {
85    /// Creates a new `Backoff`.
86    ///
87    /// # Examples
88    ///
89    /// ```
90    /// use crossbeam_utils::Backoff;
91    ///
92    /// let backoff = Backoff::new();
93    /// ```
94    #[inline]
95    pub fn new() -> Self {
96        Backoff { step: Cell::new(0) }
97    }
98
99    /// Resets the `Backoff`.
100    ///
101    /// # Examples
102    ///
103    /// ```
104    /// use crossbeam_utils::Backoff;
105    ///
106    /// let backoff = Backoff::new();
107    /// backoff.reset();
108    /// ```
109    #[inline]
110    pub fn reset(&self) {
111        self.step.set(0);
112    }
113
114    /// Backs off in a lock-free loop.
115    ///
116    /// This method should be used when we need to retry an operation because another thread made
117    /// progress.
118    ///
119    /// The processor may yield using the *YIELD* or *PAUSE* instruction.
120    ///
121    /// # Examples
122    ///
123    /// Backing off in a lock-free loop:
124    ///
125    /// ```
126    /// use crossbeam_utils::Backoff;
127    /// use std::sync::atomic::AtomicUsize;
128    /// use std::sync::atomic::Ordering::SeqCst;
129    ///
130    /// fn fetch_mul(a: &AtomicUsize, b: usize) -> usize {
131    ///     let backoff = Backoff::new();
132    ///     loop {
133    ///         let val = a.load(SeqCst);
134    ///         if a.compare_exchange(val, val.wrapping_mul(b), SeqCst, SeqCst).is_ok() {
135    ///             return val;
136    ///         }
137    ///         backoff.spin();
138    ///     }
139    /// }
140    ///
141    /// let a = AtomicUsize::new(7);
142    /// assert_eq!(fetch_mul(&a, 8), 7);
143    /// assert_eq!(a.load(SeqCst), 56);
144    /// ```
145    #[inline]
146    pub fn spin(&self) {
147        for _ in 0..1 << self.step.get().min(SPIN_LIMIT) {
148            // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
149            // use [`core::hint::spin_loop`] instead.
150            #[allow(deprecated)]
151            atomic::spin_loop_hint();
152        }
153
154        if self.step.get() <= SPIN_LIMIT {
155            self.step.set(self.step.get() + 1);
156        }
157    }
158
159    /// Backs off in a blocking loop.
160    ///
161    /// This method should be used when we need to wait for another thread to make progress.
162    ///
163    /// The processor may yield using the *YIELD* or *PAUSE* instruction and the current thread
164    /// may yield by giving up a timeslice to the OS scheduler.
165    ///
166    /// In `#[no_std]` environments, this method is equivalent to [`spin`].
167    ///
168    /// If possible, use [`is_completed`] to check when it is advised to stop using backoff and
169    /// block the current thread using a different synchronization mechanism instead.
170    ///
171    /// [`spin`]: Backoff::spin
172    /// [`is_completed`]: Backoff::is_completed
173    ///
174    /// # Examples
175    ///
176    /// Waiting for an [`AtomicBool`] to become `true`:
177    ///
178    /// ```
179    /// use crossbeam_utils::Backoff;
180    /// use std::sync::Arc;
181    /// use std::sync::atomic::AtomicBool;
182    /// use std::sync::atomic::Ordering::SeqCst;
183    /// use std::thread;
184    /// use std::time::Duration;
185    ///
186    /// fn spin_wait(ready: &AtomicBool) {
187    ///     let backoff = Backoff::new();
188    ///     while !ready.load(SeqCst) {
189    ///         backoff.snooze();
190    ///     }
191    /// }
192    ///
193    /// let ready = Arc::new(AtomicBool::new(false));
194    /// let ready2 = ready.clone();
195    ///
196    /// thread::spawn(move || {
197    ///     thread::sleep(Duration::from_millis(100));
198    ///     ready2.store(true, SeqCst);
199    /// });
200    ///
201    /// assert_eq!(ready.load(SeqCst), false);
202    /// spin_wait(&ready);
203    /// assert_eq!(ready.load(SeqCst), true);
204    /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
205    /// ```
206    ///
207    /// [`AtomicBool`]: std::sync::atomic::AtomicBool
208    #[inline]
209    pub fn snooze(&self) {
210        if self.step.get() <= SPIN_LIMIT {
211            for _ in 0..1 << self.step.get() {
212                // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
213                // use [`core::hint::spin_loop`] instead.
214                #[allow(deprecated)]
215                atomic::spin_loop_hint();
216            }
217        } else {
218            #[cfg(not(feature = "std"))]
219            for _ in 0..1 << self.step.get() {
220                // TODO(taiki-e): once we bump the minimum required Rust version to 1.49+,
221                // use [`core::hint::spin_loop`] instead.
222                #[allow(deprecated)]
223                atomic::spin_loop_hint();
224            }
225
226            #[cfg(feature = "std")]
227            ::std::thread::yield_now();
228        }
229
230        if self.step.get() <= YIELD_LIMIT {
231            self.step.set(self.step.get() + 1);
232        }
233    }
234
235    /// Returns `true` if exponential backoff has completed and blocking the thread is advised.
236    ///
237    /// # Examples
238    ///
239    /// Waiting for an [`AtomicBool`] to become `true` and parking the thread after a long wait:
240    ///
241    /// ```
242    /// use crossbeam_utils::Backoff;
243    /// use std::sync::Arc;
244    /// use std::sync::atomic::AtomicBool;
245    /// use std::sync::atomic::Ordering::SeqCst;
246    /// use std::thread;
247    /// use std::time::Duration;
248    ///
249    /// fn blocking_wait(ready: &AtomicBool) {
250    ///     let backoff = Backoff::new();
251    ///     while !ready.load(SeqCst) {
252    ///         if backoff.is_completed() {
253    ///             thread::park();
254    ///         } else {
255    ///             backoff.snooze();
256    ///         }
257    ///     }
258    /// }
259    ///
260    /// let ready = Arc::new(AtomicBool::new(false));
261    /// let ready2 = ready.clone();
262    /// let waiter = thread::current();
263    ///
264    /// thread::spawn(move || {
265    ///     thread::sleep(Duration::from_millis(100));
266    ///     ready2.store(true, SeqCst);
267    ///     waiter.unpark();
268    /// });
269    ///
270    /// assert_eq!(ready.load(SeqCst), false);
271    /// blocking_wait(&ready);
272    /// assert_eq!(ready.load(SeqCst), true);
273    /// # std::thread::sleep(std::time::Duration::from_millis(500)); // wait for background threads closed: https://github.com/rust-lang/miri/issues/1371
274    /// ```
275    ///
276    /// [`AtomicBool`]: std::sync::atomic::AtomicBool
277    #[inline]
278    pub fn is_completed(&self) -> bool {
279        self.step.get() > YIELD_LIMIT
280    }
281}
282
283impl fmt::Debug for Backoff {
284    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
285        f.debug_struct("Backoff")
286            .field("step", &self.step)
287            .field("is_completed", &self.is_completed())
288            .finish()
289    }
290}
291
292impl Default for Backoff {
293    fn default() -> Backoff {
294        Backoff::new()
295    }
296}