fuchsia_backoff/
lib.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::{self as fasync, DurationExt};
6use futures::Future;
7use std::time::Duration;
8
9/// Execute the async task. If it errs out, attempt to run the task again after a delay if the
10/// `backoff` yields a duration. Otherwise, return the first error that occurred to the caller.
11///
12/// # Examples
13///
14/// `retry_or_first_error` will succeed if the task returns `Ok` before the `backoff` returns None.
15///
16/// ```
17/// # use std::iter::repeat;
18/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
19/// let counter = Arc::new(AtomicUsize::new(0));
20/// let result = retry_or_first_error(
21///     repeat(Duration::from_secs(1)),
22///     || async {
23///         let count = counter.fetch_add(1, Ordering::SeqCst);
24///         if count == 5 {
25///             Ok(count)
26///         } else {
27///             Err(count)
28///         }
29///     }),
30/// ).await;
31/// assert_eq!(result, Ok(5));
32/// ```
33///
34/// If the task fails, the `retry_or_first_error` will return the first error.
35///
36/// ```
37/// # use std::iter::repeat;
38/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
39/// let counter = Arc::new(AtomicUsize::new(0));
40/// let result = retry_or_first_error(
41///     repeat(Duration::from_secs(1)).take(5),
42///     || async {
43///         let count = counter.fetch_add(1, Ordering::SeqCst);
44///         Err(count)
45///     }),
46/// ).await;
47/// assert_eq!(result, Err(0));
48/// ```
49pub async fn retry_or_first_error<B, T>(mut backoff: B, task: T) -> Result<T::Ok, T::Error>
50where
51    B: Backoff<T::Error>,
52    T: Task,
53{
54    match next(&mut backoff, task).await {
55        Ok(value) => Ok(value),
56        Err((err, None)) => Err(err),
57        Err((err, Some(task))) => match retry_or_last_error(backoff, task).await {
58            Ok(value) => Ok(value),
59            Err(_) => Err(err),
60        },
61    }
62}
63
64/// Execute the async task. If it errs out, attempt to run the task again after a delay if the
65/// `backoff` yields a duration. Otherwise, return the last error that occurred to the caller.
66///
67/// # Examples
68///
69/// `retry_or_last_error` will succeed if the task returns `Ok` before the `backoff` returns None.
70///
71/// ```
72/// # use std::iter::repeat;
73/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
74/// let counter = Arc::new(AtomicUsize::new(0));
75/// let result = retry_or_last_error(
76///     repeat(Duration::from_secs(1)),
77///     || async {
78///         let count = counter.fetch_add(1, Ordering::SeqCst);
79///         if count == 5 {
80///             Ok(count)
81///         } else {
82///             Err(count)
83///         }
84///     }),
85/// ).await;
86/// assert_eq!(result, Ok(5));
87/// ```
88///
89/// If the task fails, the `retry_or_last_error` will return the last error.
90///
91/// ```
92/// # use std::iter::repeat;
93/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
94/// let counter = Arc::new(AtomicUsize::new(0));
95/// let result = retry_or_last_error(
96///     repeat(Duration::from_secs(1)).take(5),
97///     || async {
98///         let count = counter.fetch_add(1, Ordering::SeqCst);
99///         Err(count)
100///     }),
101/// ).await;
102/// assert_eq!(result, Err(5));
103/// ```
104pub async fn retry_or_last_error<B, T>(mut backoff: B, mut task: T) -> Result<T::Ok, T::Error>
105where
106    B: Backoff<T::Error>,
107    T: Task,
108{
109    loop {
110        match next(&mut backoff, task).await {
111            Ok(value) => {
112                return Ok(value);
113            }
114            Err((err, None)) => {
115                return Err(err);
116            }
117            Err((_, Some(next))) => {
118                task = next;
119            }
120        }
121    }
122}
123
124/// Execute the async task. If it errs out, attempt to run the task again after a delay if the
125/// `backoff` yields a duration. Otherwise, collect all the errors and return them to the caller.
126///
127/// # Examples
128///
129/// `retry_or_last_error` will succeed if it returns `Ok` before the `backoff` returns None.
130///
131/// ```
132/// # use std::iter::repeat;
133/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
134/// let counter = Arc::new(AtomicUsize::new(0));
135/// let result = retry_or_collect_errors(
136///     repeat(Duration::from_secs(1)),
137///     || async {
138///         let count = counter.fetch_add(1, Ordering::SeqCst);
139///         if count == 5 {
140///             Ok(count)
141///         } else {
142///             Err(count)
143///         }
144///     }),
145/// ).await;
146/// assert_eq!(result, Ok(5));
147/// ```
148///
149/// If the task fails, it will return all the errors.
150///
151/// ```
152/// # use std::iter::repeat;
153/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
154/// let counter = Arc::new(AtomicUsize::new(0));
155/// let result = retry_or_last_error(
156///     repeat(Duration::from_secs(1)).take(5),
157///     || async {
158///         let count = counter.fetch_add(1, Ordering::SeqCst);
159///         Err(count)
160///     }),
161/// ).await;
162/// assert_eq!(result, Err(vec![0, 1, 2, 3, 4]));
163/// ```
164pub async fn retry_or_collect_errors<B, T, C>(mut backoff: B, mut task: T) -> Result<T::Ok, C>
165where
166    B: Backoff<T::Error>,
167    T: Task,
168    C: Default + Extend<T::Error>,
169{
170    let mut collection = C::default();
171    loop {
172        match next(&mut backoff, task).await {
173            Ok(value) => {
174                return Ok(value);
175            }
176            Err((err, next)) => {
177                collection.extend(Some(err));
178                match next {
179                    Some(next) => {
180                        task = next;
181                    }
182                    None => {
183                        return Err(collection);
184                    }
185                }
186            }
187        }
188    }
189}
190
191async fn next<B, T>(backoff: &mut B, mut task: T) -> Result<T::Ok, (T::Error, Option<T>)>
192where
193    B: Backoff<T::Error>,
194    T: Task,
195{
196    match task.run().await {
197        Ok(value) => Ok(value),
198        Err(err) => match backoff.next_backoff(&err) {
199            Some(delay) => {
200                let delay = delay.after_now();
201                fasync::Timer::new(delay).await;
202                Err((err, Some(task)))
203            }
204            None => Err((err, None)),
205        },
206    }
207}
208
209/// A task produces an asynchronous computation that can be retried if the returned future fails
210/// with some error.
211pub trait Task {
212    /// The type of successful values yielded by the task future.
213    type Ok;
214
215    /// The type of failures yielded by the task future.
216    type Error;
217
218    /// The future returned when executing this task.
219    type Future: Future<Output = Result<Self::Ok, Self::Error>>;
220
221    /// Return a future.
222    fn run(&mut self) -> Self::Future;
223}
224
225impl<F, Fut, T, E> Task for F
226where
227    F: FnMut() -> Fut,
228    Fut: Future<Output = Result<T, E>>,
229{
230    type Ok = T;
231    type Error = E;
232    type Future = Fut;
233
234    fn run(&mut self) -> Self::Future {
235        (self)()
236    }
237}
238
239/// A backoff policy for deciding to retry an operation.
240pub trait Backoff<E> {
241    fn next_backoff(&mut self, err: &E) -> Option<Duration>;
242}
243
244impl<E, I> Backoff<E> for I
245where
246    I: Iterator<Item = Duration>,
247{
248    fn next_backoff(&mut self, _: &E) -> Option<Duration> {
249        self.next()
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use super::*;
256    use futures::prelude::*;
257    use std::iter;
258    use std::sync::atomic::{AtomicUsize, Ordering};
259    use std::sync::Arc;
260
261    const BACKOFF_DURATION: u64 = 100;
262
263    #[derive(Clone)]
264    struct Counter {
265        counter: Arc<AtomicUsize>,
266        ok_at: Option<usize>,
267    }
268
269    impl Counter {
270        fn ok_at(count: usize) -> Self {
271            Counter { counter: Arc::new(AtomicUsize::new(0)), ok_at: Some(count) }
272        }
273
274        fn never_ok() -> Self {
275            Counter { counter: Arc::new(AtomicUsize::new(0)), ok_at: None }
276        }
277    }
278
279    impl Task for Counter {
280        type Ok = usize;
281        type Error = usize;
282        type Future = future::Ready<Result<usize, usize>>;
283
284        fn run(&mut self) -> Self::Future {
285            let count = self.counter.fetch_add(1, Ordering::SeqCst);
286            match self.ok_at {
287                Some(ok_at) if ok_at == count => future::ready(Ok(count)),
288                _ => future::ready(Err(count)),
289            }
290        }
291    }
292
293    #[cfg(target_os = "fuchsia")]
294    fn run<F>(future: F, pending_count: usize) -> F::Output
295    where
296        F: Future,
297        F::Output: std::fmt::Debug + PartialEq + Eq,
298    {
299        use std::pin::pin;
300        use std::task::Poll;
301
302        let mut future = pin!(future);
303        let mut executor = fasync::TestExecutor::new_with_fake_time();
304
305        for _ in 0..pending_count {
306            assert_eq!(executor.run_until_stalled(&mut future), Poll::Pending);
307            assert_eq!(executor.wake_expired_timers(), false);
308            executor.set_fake_time(Duration::from_millis(BACKOFF_DURATION).after_now());
309            assert_eq!(executor.wake_expired_timers(), true);
310        }
311
312        match executor.run_until_stalled(&mut future) {
313            Poll::Ready(value) => value,
314            Poll::Pending => panic!("expected future to be ready"),
315        }
316    }
317
318    #[cfg(not(target_os = "fuchsia"))]
319    fn run<F>(future: F, pending_count: usize) -> F::Output
320    where
321        F: Future,
322        F::Output: std::fmt::Debug + PartialEq + Eq,
323    {
324        // The host-side executor doesn't support fake time, so just run the future and make sure
325        // it ran longer than the backoff duration.
326        let mut executor = fasync::LocalExecutor::new();
327
328        let start_time = fasync::MonotonicInstant::now();
329
330        let result = executor.run_singlethreaded(future);
331
332        let actual_duration = fasync::MonotonicInstant::now() - start_time;
333        let expected_duration = Duration::from_millis(BACKOFF_DURATION) * pending_count as u32;
334
335        assert!(expected_duration <= actual_duration);
336
337        result
338    }
339
340    // Return `attempts` durations.
341    fn backoff(attempts: usize) -> impl Iterator<Item = Duration> {
342        iter::repeat(Duration::from_millis(BACKOFF_DURATION)).take(attempts)
343    }
344
345    #[test]
346    fn test_should_succeed() {
347        for i in 0..5 {
348            // to test passing, always attempt one more attempt than necessary before Counter
349            // succeeds.
350
351            assert_eq!(run(retry_or_first_error(backoff(i + 1), Counter::ok_at(i)), i), Ok(i));
352            assert_eq!(run(retry_or_last_error(backoff(i + 1), Counter::ok_at(i)), i), Ok(i));
353            assert_eq!(
354                run(retry_or_collect_errors(backoff(i + 1), Counter::ok_at(i)), i),
355                Ok::<usize, Vec<usize>>(i)
356            );
357
358            // Check FnMut impl works. It always succeeds during the first iteration.
359            let task = || future::ready(Ok::<_, ()>(i));
360            assert_eq!(run(retry_or_last_error(backoff(i + 1), task), 0), Ok(i));
361        }
362    }
363
364    #[test]
365    fn test_should_error() {
366        for i in 0..5 {
367            assert_eq!(run(retry_or_first_error(backoff(i), Counter::never_ok()), i), Err(0));
368            assert_eq!(run(retry_or_last_error(backoff(i), Counter::never_ok()), i), Err(i));
369            assert_eq!(
370                run(retry_or_collect_errors(backoff(i), Counter::never_ok()), i),
371                Err::<usize, Vec<usize>>((0..=i).collect())
372            );
373
374            // Check FnMut impl works.
375            let task = || future::ready(Err::<(), _>(i));
376            assert_eq!(run(retry_or_last_error(backoff(i), task), i), Err(i));
377        }
378    }
379}