fuchsia_backoff/lib.rs
1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use fuchsia_async::{self as fasync, DurationExt};
6use futures::Future;
7use std::time::Duration;
8
9/// Execute the async task. If it errs out, attempt to run the task again after a delay if the
10/// `backoff` yields a duration. Otherwise, return the first error that occurred to the caller.
11///
12/// # Examples
13///
14/// `retry_or_first_error` will succeed if the task returns `Ok` before the `backoff` returns None.
15///
16/// ```
17/// # use std::iter::repeat;
18/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
19/// let counter = Arc::new(AtomicUsize::new(0));
20/// let result = retry_or_first_error(
21/// repeat(Duration::from_secs(1)),
22/// || async {
23/// let count = counter.fetch_add(1, Ordering::SeqCst);
24/// if count == 5 {
25/// Ok(count)
26/// } else {
27/// Err(count)
28/// }
29/// }),
30/// ).await;
31/// assert_eq!(result, Ok(5));
32/// ```
33///
34/// If the task fails, the `retry_or_first_error` will return the first error.
35///
36/// ```
37/// # use std::iter::repeat;
38/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
39/// let counter = Arc::new(AtomicUsize::new(0));
40/// let result = retry_or_first_error(
41/// repeat(Duration::from_secs(1)).take(5),
42/// || async {
43/// let count = counter.fetch_add(1, Ordering::SeqCst);
44/// Err(count)
45/// }),
46/// ).await;
47/// assert_eq!(result, Err(0));
48/// ```
49pub async fn retry_or_first_error<B, T>(mut backoff: B, task: T) -> Result<T::Ok, T::Error>
50where
51 B: Backoff<T::Error>,
52 T: Task,
53{
54 match next(&mut backoff, task).await {
55 Ok(value) => Ok(value),
56 Err((err, None)) => Err(err),
57 Err((err, Some(task))) => match retry_or_last_error(backoff, task).await {
58 Ok(value) => Ok(value),
59 Err(_) => Err(err),
60 },
61 }
62}
63
64/// Execute the async task. If it errs out, attempt to run the task again after a delay if the
65/// `backoff` yields a duration. Otherwise, return the last error that occurred to the caller.
66///
67/// # Examples
68///
69/// `retry_or_last_error` will succeed if the task returns `Ok` before the `backoff` returns None.
70///
71/// ```
72/// # use std::iter::repeat;
73/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
74/// let counter = Arc::new(AtomicUsize::new(0));
75/// let result = retry_or_last_error(
76/// repeat(Duration::from_secs(1)),
77/// || async {
78/// let count = counter.fetch_add(1, Ordering::SeqCst);
79/// if count == 5 {
80/// Ok(count)
81/// } else {
82/// Err(count)
83/// }
84/// }),
85/// ).await;
86/// assert_eq!(result, Ok(5));
87/// ```
88///
89/// If the task fails, the `retry_or_last_error` will return the last error.
90///
91/// ```
92/// # use std::iter::repeat;
93/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
94/// let counter = Arc::new(AtomicUsize::new(0));
95/// let result = retry_or_last_error(
96/// repeat(Duration::from_secs(1)).take(5),
97/// || async {
98/// let count = counter.fetch_add(1, Ordering::SeqCst);
99/// Err(count)
100/// }),
101/// ).await;
102/// assert_eq!(result, Err(5));
103/// ```
104pub async fn retry_or_last_error<B, T>(mut backoff: B, mut task: T) -> Result<T::Ok, T::Error>
105where
106 B: Backoff<T::Error>,
107 T: Task,
108{
109 loop {
110 match next(&mut backoff, task).await {
111 Ok(value) => {
112 return Ok(value);
113 }
114 Err((err, None)) => {
115 return Err(err);
116 }
117 Err((_, Some(next))) => {
118 task = next;
119 }
120 }
121 }
122}
123
124/// Execute the async task. If it errs out, attempt to run the task again after a delay if the
125/// `backoff` yields a duration. Otherwise, collect all the errors and return them to the caller.
126///
127/// # Examples
128///
129/// `retry_or_last_error` will succeed if it returns `Ok` before the `backoff` returns None.
130///
131/// ```
132/// # use std::iter::repeat;
133/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
134/// let counter = Arc::new(AtomicUsize::new(0));
135/// let result = retry_or_collect_errors(
136/// repeat(Duration::from_secs(1)),
137/// || async {
138/// let count = counter.fetch_add(1, Ordering::SeqCst);
139/// if count == 5 {
140/// Ok(count)
141/// } else {
142/// Err(count)
143/// }
144/// }),
145/// ).await;
146/// assert_eq!(result, Ok(5));
147/// ```
148///
149/// If the task fails, it will return all the errors.
150///
151/// ```
152/// # use std::iter::repeat;
153/// # use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
154/// let counter = Arc::new(AtomicUsize::new(0));
155/// let result = retry_or_last_error(
156/// repeat(Duration::from_secs(1)).take(5),
157/// || async {
158/// let count = counter.fetch_add(1, Ordering::SeqCst);
159/// Err(count)
160/// }),
161/// ).await;
162/// assert_eq!(result, Err(vec![0, 1, 2, 3, 4]));
163/// ```
164pub async fn retry_or_collect_errors<B, T, C>(mut backoff: B, mut task: T) -> Result<T::Ok, C>
165where
166 B: Backoff<T::Error>,
167 T: Task,
168 C: Default + Extend<T::Error>,
169{
170 let mut collection = C::default();
171 loop {
172 match next(&mut backoff, task).await {
173 Ok(value) => {
174 return Ok(value);
175 }
176 Err((err, next)) => {
177 collection.extend(Some(err));
178 match next {
179 Some(next) => {
180 task = next;
181 }
182 None => {
183 return Err(collection);
184 }
185 }
186 }
187 }
188 }
189}
190
191async fn next<B, T>(backoff: &mut B, mut task: T) -> Result<T::Ok, (T::Error, Option<T>)>
192where
193 B: Backoff<T::Error>,
194 T: Task,
195{
196 match task.run().await {
197 Ok(value) => Ok(value),
198 Err(err) => match backoff.next_backoff(&err) {
199 Some(delay) => {
200 let delay = delay.after_now();
201 fasync::Timer::new(delay).await;
202 Err((err, Some(task)))
203 }
204 None => Err((err, None)),
205 },
206 }
207}
208
209/// A task produces an asynchronous computation that can be retried if the returned future fails
210/// with some error.
211pub trait Task {
212 /// The type of successful values yielded by the task future.
213 type Ok;
214
215 /// The type of failures yielded by the task future.
216 type Error;
217
218 /// The future returned when executing this task.
219 type Future: Future<Output = Result<Self::Ok, Self::Error>>;
220
221 /// Return a future.
222 fn run(&mut self) -> Self::Future;
223}
224
225impl<F, Fut, T, E> Task for F
226where
227 F: FnMut() -> Fut,
228 Fut: Future<Output = Result<T, E>>,
229{
230 type Ok = T;
231 type Error = E;
232 type Future = Fut;
233
234 fn run(&mut self) -> Self::Future {
235 (self)()
236 }
237}
238
239/// A backoff policy for deciding to retry an operation.
240pub trait Backoff<E> {
241 fn next_backoff(&mut self, err: &E) -> Option<Duration>;
242}
243
244impl<E, I> Backoff<E> for I
245where
246 I: Iterator<Item = Duration>,
247{
248 fn next_backoff(&mut self, _: &E) -> Option<Duration> {
249 self.next()
250 }
251}
252
253#[cfg(test)]
254mod tests {
255 use super::*;
256 use futures::prelude::*;
257 use std::iter;
258 use std::sync::atomic::{AtomicUsize, Ordering};
259 use std::sync::Arc;
260
261 const BACKOFF_DURATION: u64 = 100;
262
263 #[derive(Clone)]
264 struct Counter {
265 counter: Arc<AtomicUsize>,
266 ok_at: Option<usize>,
267 }
268
269 impl Counter {
270 fn ok_at(count: usize) -> Self {
271 Counter { counter: Arc::new(AtomicUsize::new(0)), ok_at: Some(count) }
272 }
273
274 fn never_ok() -> Self {
275 Counter { counter: Arc::new(AtomicUsize::new(0)), ok_at: None }
276 }
277 }
278
279 impl Task for Counter {
280 type Ok = usize;
281 type Error = usize;
282 type Future = future::Ready<Result<usize, usize>>;
283
284 fn run(&mut self) -> Self::Future {
285 let count = self.counter.fetch_add(1, Ordering::SeqCst);
286 match self.ok_at {
287 Some(ok_at) if ok_at == count => future::ready(Ok(count)),
288 _ => future::ready(Err(count)),
289 }
290 }
291 }
292
293 #[cfg(target_os = "fuchsia")]
294 fn run<F>(future: F, pending_count: usize) -> F::Output
295 where
296 F: Future,
297 F::Output: std::fmt::Debug + PartialEq + Eq,
298 {
299 use std::pin::pin;
300 use std::task::Poll;
301
302 let mut future = pin!(future);
303 let mut executor = fasync::TestExecutor::new_with_fake_time();
304
305 for _ in 0..pending_count {
306 assert_eq!(executor.run_until_stalled(&mut future), Poll::Pending);
307 assert_eq!(executor.wake_expired_timers(), false);
308 executor.set_fake_time(Duration::from_millis(BACKOFF_DURATION).after_now());
309 assert_eq!(executor.wake_expired_timers(), true);
310 }
311
312 match executor.run_until_stalled(&mut future) {
313 Poll::Ready(value) => value,
314 Poll::Pending => panic!("expected future to be ready"),
315 }
316 }
317
318 #[cfg(not(target_os = "fuchsia"))]
319 fn run<F>(future: F, pending_count: usize) -> F::Output
320 where
321 F: Future,
322 F::Output: std::fmt::Debug + PartialEq + Eq,
323 {
324 // The host-side executor doesn't support fake time, so just run the future and make sure
325 // it ran longer than the backoff duration.
326 let mut executor = fasync::LocalExecutor::new();
327
328 let start_time = fasync::MonotonicInstant::now();
329
330 let result = executor.run_singlethreaded(future);
331
332 let actual_duration = fasync::MonotonicInstant::now() - start_time;
333 let expected_duration = Duration::from_millis(BACKOFF_DURATION) * pending_count as u32;
334
335 assert!(expected_duration <= actual_duration);
336
337 result
338 }
339
340 // Return `attempts` durations.
341 fn backoff(attempts: usize) -> impl Iterator<Item = Duration> {
342 iter::repeat(Duration::from_millis(BACKOFF_DURATION)).take(attempts)
343 }
344
345 #[test]
346 fn test_should_succeed() {
347 for i in 0..5 {
348 // to test passing, always attempt one more attempt than necessary before Counter
349 // succeeds.
350
351 assert_eq!(run(retry_or_first_error(backoff(i + 1), Counter::ok_at(i)), i), Ok(i));
352 assert_eq!(run(retry_or_last_error(backoff(i + 1), Counter::ok_at(i)), i), Ok(i));
353 assert_eq!(
354 run(retry_or_collect_errors(backoff(i + 1), Counter::ok_at(i)), i),
355 Ok::<usize, Vec<usize>>(i)
356 );
357
358 // Check FnMut impl works. It always succeeds during the first iteration.
359 let task = || future::ready(Ok::<_, ()>(i));
360 assert_eq!(run(retry_or_last_error(backoff(i + 1), task), 0), Ok(i));
361 }
362 }
363
364 #[test]
365 fn test_should_error() {
366 for i in 0..5 {
367 assert_eq!(run(retry_or_first_error(backoff(i), Counter::never_ok()), i), Err(0));
368 assert_eq!(run(retry_or_last_error(backoff(i), Counter::never_ok()), i), Err(i));
369 assert_eq!(
370 run(retry_or_collect_errors(backoff(i), Counter::never_ok()), i),
371 Err::<usize, Vec<usize>>((0..=i).collect())
372 );
373
374 // Check FnMut impl works.
375 let task = || future::ready(Err::<(), _>(i));
376 assert_eq!(run(retry_or_last_error(backoff(i), task), i), Err(i));
377 }
378 }
379}