1// Copyright 2015-2016 Benjamin Fry <benjaminfry@me.com>
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
78//! `RetryDnsHandle` allows for DnsQueries to be reattempted on failure
910use std::pin::Pin;
11use std::task::{Context, Poll};
1213use futures_util::stream::{Stream, StreamExt};
1415use crate::error::{ProtoError, ProtoErrorKind};
16use crate::xfer::{DnsRequest, DnsResponse};
17use crate::DnsHandle;
1819/// Can be used to reattempt queries if they fail
20///
21/// Note: this does not reattempt queries that fail with a negative response.
22/// For example, if a query gets a `NODATA` response from a name server, the
23/// query will not be retried. It only reattempts queries that effectively
24/// failed to get a response, such as queries that resulted in IO or timeout
25/// errors.
26///
27/// Whether an error is retryable by the [`RetryDnsHandle`] is determined by the
28/// [`RetryableError`] trait.
29///
30/// *note* Current value of this is not clear, it may be removed
31#[derive(Clone)]
32#[must_use = "queries can only be sent through a ClientHandle"]
33pub struct RetryDnsHandle<H>
34where
35H: DnsHandle + Unpin + Send,
36 H::Error: RetryableError,
37{
38 handle: H,
39 attempts: usize,
40}
4142impl<H> RetryDnsHandle<H>
43where
44H: DnsHandle + Unpin + Send,
45 H::Error: RetryableError,
46{
47/// Creates a new Client handler for reattempting requests on failures.
48 ///
49 /// # Arguments
50 ///
51 /// * `handle` - handle to the dns connection
52 /// * `attempts` - number of attempts before failing
53pub fn new(handle: H, attempts: usize) -> Self {
54Self { handle, attempts }
55 }
56}
5758impl<H> DnsHandle for RetryDnsHandle<H>
59where
60H: DnsHandle + Send + Unpin + 'static,
61 H::Error: RetryableError,
62{
63type Response = Pin<Box<dyn Stream<Item = Result<DnsResponse, Self::Error>> + Send + Unpin>>;
64type Error = <H as DnsHandle>::Error;
6566fn send<R: Into<DnsRequest>>(&mut self, request: R) -> Self::Response {
67let request = request.into();
6869// need to clone here so that the retry can resend if necessary...
70 // obviously it would be nice to be lazy about this...
71let stream = self.handle.send(request.clone());
7273 Box::pin(RetrySendStream {
74 request,
75 handle: self.handle.clone(),
76 stream,
77 remaining_attempts: self.attempts,
78 })
79 }
80}
8182/// A stream for retrying (on failure, for the remaining number of times specified)
83struct RetrySendStream<H>
84where
85H: DnsHandle,
86{
87 request: DnsRequest,
88 handle: H,
89 stream: <H as DnsHandle>::Response,
90 remaining_attempts: usize,
91}
9293impl<H: DnsHandle + Unpin> Stream for RetrySendStream<H>
94where
95<H as DnsHandle>::Error: RetryableError,
96{
97type Item = Result<DnsResponse, <H as DnsHandle>::Error>;
9899fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
100// loop over the stream, on errors, spawn a new stream
101 // on ready and not ready return.
102loop {
103match self.stream.poll_next_unpin(cx) {
104 Poll::Ready(Some(Err(e))) => {
105if self.remaining_attempts == 0 || !e.should_retry() {
106return Poll::Ready(Some(Err(e)));
107 }
108109if e.attempted() {
110self.remaining_attempts -= 1;
111 }
112113// TODO: if the "sent" Message is part of the error result,
114 // then we can just reuse it... and no clone necessary
115let request = self.request.clone();
116self.stream = self.handle.send(request);
117 }
118 poll => return poll,
119 }
120 }
121 }
122}
123124/// What errors should be retried
125pub trait RetryableError {
126/// Whether the query should be retried after this error
127fn should_retry(&self) -> bool;
128/// Whether this error should count as an attempt
129fn attempted(&self) -> bool;
130}
131132impl RetryableError for ProtoError {
133fn should_retry(&self) -> bool {
134true
135}
136137fn attempted(&self) -> bool {
138 !matches!(self.kind(), ProtoErrorKind::Busy)
139 }
140}
141142#[cfg(test)]
143mod test {
144use super::*;
145use crate::error::*;
146use crate::op::*;
147use crate::xfer::FirstAnswer;
148use futures_executor::block_on;
149use futures_util::future::*;
150use futures_util::stream::*;
151use std::sync::{
152 atomic::{AtomicU16, Ordering},
153 Arc,
154 };
155use DnsHandle;
156157#[derive(Clone)]
158struct TestClient {
159 last_succeed: bool,
160 retries: u16,
161 attempts: Arc<AtomicU16>,
162 }
163164impl DnsHandle for TestClient {
165type Response = Box<dyn Stream<Item = Result<DnsResponse, ProtoError>> + Send + Unpin>;
166type Error = ProtoError;
167168fn send<R: Into<DnsRequest>>(&mut self, _: R) -> Self::Response {
169let i = self.attempts.load(Ordering::SeqCst);
170171if (i > self.retries || self.retries - i == 0) && self.last_succeed {
172let mut message = Message::new();
173 message.set_id(i);
174return Box::new(once(ok(message.into())));
175 }
176177self.attempts.fetch_add(1, Ordering::SeqCst);
178 Box::new(once(err(ProtoError::from("last retry set to fail"))))
179 }
180 }
181182#[test]
183fn test_retry() {
184let mut handle = RetryDnsHandle::new(
185 TestClient {
186 last_succeed: true,
187 retries: 1,
188 attempts: Arc::new(AtomicU16::new(0)),
189 },
1902,
191 );
192let test1 = Message::new();
193let result = block_on(handle.send(test1).first_answer()).expect("should have succeeded");
194assert_eq!(result.id(), 1); // this is checking the number of iterations the TestClient ran
195}
196197#[test]
198fn test_error() {
199let mut client = RetryDnsHandle::new(
200 TestClient {
201 last_succeed: false,
202 retries: 1,
203 attempts: Arc::new(AtomicU16::new(0)),
204 },
2052,
206 );
207let test1 = Message::new();
208assert!(block_on(client.send(test1).first_answer()).is_err());
209 }
210}