fuchsia_pkg_testing/serve/
responder.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! HttpResponder implementations
6
7use crate::serve::HttpResponder;
8use fuchsia_sync::Mutex;
9use futures::channel::{mpsc, oneshot};
10use futures::future::{BoxFuture, Shared, pending, ready};
11use futures::prelude::*;
12use hyper::{Body, Request, Response, StatusCode};
13use std::collections::HashSet;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering};
17
18/// hyper::Request extension trait that makes writing `HttpResponder`s more convienent.
19pub trait RequestExt {
20    /// The URI path of the Request.
21    fn path(&self) -> &Path;
22}
23
24impl RequestExt for Request<Body> {
25    /// The URI path of the Request.
26    fn path(&self) -> &Path {
27        Path::new(self.uri().path())
28    }
29}
30
31/// Responder that always responds with the given status code
32pub struct StaticResponseCode(StatusCode);
33
34impl HttpResponder for StaticResponseCode {
35    fn respond(&self, _: &Request<Body>, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
36        ready(Response::builder().status(self.0).body(Body::empty()).unwrap()).boxed()
37    }
38}
39
40impl StaticResponseCode {
41    /// Creates responder that always responds with the given status code
42    pub fn new(status: StatusCode) -> Self {
43        Self(status)
44    }
45
46    /// Creates responder that always responds with 200 OK
47    pub fn ok() -> Self {
48        Self(StatusCode::OK)
49    }
50
51    /// Creates responder that always responds with 404 Not Found
52    pub fn not_found() -> Self {
53        Self(StatusCode::NOT_FOUND)
54    }
55
56    /// Creates responder that always responds with 500 Internal Server Error
57    pub fn server_error() -> Self {
58        Self(StatusCode::INTERNAL_SERVER_ERROR)
59    }
60
61    /// Creates responder that always responds with 429 Too Many Requests
62    pub fn too_many_requests() -> Self {
63        Self(StatusCode::TOO_MANY_REQUESTS)
64    }
65}
66
67/// An atomic HTTP status code carrier.
68#[derive(Debug, Default)]
69pub struct DynamicResponseSetter(Arc<AtomicU16>);
70
71impl DynamicResponseSetter {
72    /// Atomically sets this toggle to the supplied code.
73    pub fn set(&self, code: u16) {
74        self.0.store(code, Ordering::SeqCst);
75    }
76}
77
78/// Responder that replies with an externally-settable HTTP status.
79pub struct DynamicResponseCode {
80    code: Arc<AtomicU16>,
81}
82
83impl HttpResponder for DynamicResponseCode {
84    fn respond<'a>(
85        &'a self,
86        _: &'a Request<Body>,
87        _: Response<Body>,
88    ) -> BoxFuture<'_, Response<Body>> {
89        ready(
90            Response::builder()
91                .status(self.code.load(Ordering::SeqCst))
92                .body(Body::empty())
93                .unwrap(),
94        )
95        .boxed()
96    }
97}
98
99impl DynamicResponseCode {
100    /// Creates a new responder with a (re)settable status code.
101    pub fn new(initial: u16) -> (Self, DynamicResponseSetter) {
102        let setter = DynamicResponseSetter(Arc::new(initial.into()));
103        (Self { code: Arc::clone(&setter.0) }, setter)
104    }
105}
106
107/// An atomic toggle switch.
108#[derive(Debug, Default)]
109pub struct AtomicToggle(Arc<AtomicBool>);
110
111impl AtomicToggle {
112    /// Creates a new AtomicToggle initialized to `initial`.
113    pub fn new(initial: bool) -> Self {
114        Self(Arc::new(initial.into()))
115    }
116
117    /// Atomically sets this toggle to true.
118    pub fn set(&self) {
119        self.0.store(true, Ordering::SeqCst);
120    }
121
122    /// Atomically sets this toggle to false.
123    pub fn unset(&self) {
124        self.0.store(false, Ordering::SeqCst);
125    }
126}
127
128/// Responder that overrides requests with the given responder only when enabled
129pub struct Toggleable<H: HttpResponder> {
130    enabled: Arc<AtomicBool>,
131    responder: H,
132}
133
134impl<H: HttpResponder> HttpResponder for Toggleable<H> {
135    fn respond<'a>(
136        &'a self,
137        request: &'a Request<Body>,
138        response: Response<Body>,
139    ) -> BoxFuture<'_, Response<Body>> {
140        if self.enabled.load(Ordering::SeqCst) {
141            self.responder.respond(request, response)
142        } else {
143            ready(response).boxed()
144        }
145    }
146}
147
148impl<H: HttpResponder> Toggleable<H> {
149    /// Creates responder that overrides requests when should_override is set.
150    pub fn new(should_override: &AtomicToggle, responder: H) -> Self {
151        Self { enabled: Arc::clone(&should_override.0), responder }
152    }
153}
154
155/// Responder that overrides the given request path for the given number of requests.
156pub struct ForRequestCount<H: HttpResponder> {
157    remaining: Mutex<u32>,
158    responder: H,
159}
160
161impl<H: HttpResponder> HttpResponder for ForRequestCount<H> {
162    fn respond<'a>(
163        &'a self,
164        request: &'a Request<Body>,
165        response: Response<Body>,
166    ) -> BoxFuture<'_, Response<Body>> {
167        let mut remaining = self.remaining.lock();
168        if *remaining > 0 {
169            *remaining -= 1;
170            drop(remaining);
171            self.responder.respond(request, response)
172        } else {
173            ready(response).boxed()
174        }
175    }
176}
177
178impl<H: HttpResponder> ForRequestCount<H> {
179    /// Creates responder that overrides the given request path for the given number of requests.
180    pub fn new(count: u32, responder: H) -> Self {
181        Self { remaining: Mutex::new(count), responder }
182    }
183}
184
185/// Responder that overrides the given request path using the given responder.
186pub struct ForPath<H: HttpResponder> {
187    path: PathBuf,
188    responder: H,
189}
190
191impl<H: HttpResponder> HttpResponder for ForPath<H> {
192    fn respond<'a>(
193        &'a self,
194        request: &'a Request<Body>,
195        response: Response<Body>,
196    ) -> BoxFuture<'_, Response<Body>> {
197        if self.path == request.path() {
198            self.responder.respond(request, response)
199        } else {
200            ready(response).boxed()
201        }
202    }
203}
204
205impl<H: HttpResponder> ForPath<H> {
206    /// Creates responder that overrides the given request path using the given responder.
207    pub fn new(path: impl Into<PathBuf>, responder: H) -> Self {
208        Self { path: path.into(), responder }
209    }
210}
211
212/// Responder that overrides the given request paths using the given responder.
213pub struct ForPaths<H: HttpResponder> {
214    paths: HashSet<PathBuf>,
215    responder: H,
216}
217
218impl<H: HttpResponder> HttpResponder for ForPaths<H> {
219    fn respond<'a>(
220        &'a self,
221        request: &'a Request<Body>,
222        response: Response<Body>,
223    ) -> BoxFuture<'_, Response<Body>> {
224        if self.paths.contains(request.path()) {
225            self.responder.respond(request, response)
226        } else {
227            ready(response).boxed()
228        }
229    }
230}
231
232impl<H: HttpResponder> ForPaths<H> {
233    /// Creates responder that overrides the given request paths using the given responder.
234    pub fn new(paths: HashSet<PathBuf>, responder: H) -> Self {
235        Self { paths, responder }
236    }
237}
238
239/// Responder that overrides all the requests that start with the given request path using the
240/// given responder.
241pub struct ForPathPrefix<H: HttpResponder> {
242    prefix: PathBuf,
243    responder: H,
244}
245
246impl<H: HttpResponder> HttpResponder for ForPathPrefix<H> {
247    fn respond<'a>(
248        &'a self,
249        request: &'a Request<Body>,
250        response: Response<Body>,
251    ) -> BoxFuture<'_, Response<Body>> {
252        if request.path().starts_with(&self.prefix) {
253            self.responder.respond(request, response)
254        } else {
255            ready(response).boxed()
256        }
257    }
258}
259
260impl<H: HttpResponder> ForPathPrefix<H> {
261    /// Creates responder that overrides all the requests that start with the given request path
262    /// using the given responder.
263    pub fn new(prefix: impl Into<PathBuf>, responder: H) -> Self {
264        Self { prefix: prefix.into(), responder }
265    }
266}
267
268/// Responder that overrides all the requests that end with the given request path using the given responder.
269///
270/// Useful for hitting all versions of versioned TUF metadata (e.g. X.targets.json).
271/// TODO(ampearce): change ForPathSuffix and ForPathPrefix to use string matches rather than path.
272pub struct ForPathSuffix<H: HttpResponder> {
273    suffix: PathBuf,
274    responder: H,
275}
276
277impl<H: HttpResponder> HttpResponder for ForPathSuffix<H> {
278    fn respond<'a>(
279        &'a self,
280        request: &'a Request<Body>,
281        response: Response<Body>,
282    ) -> BoxFuture<'_, Response<Body>> {
283        if request.path().ends_with(&self.suffix) {
284            self.responder.respond(request, response)
285        } else {
286            ready(response).boxed()
287        }
288    }
289}
290
291impl<H: HttpResponder> ForPathSuffix<H> {
292    /// Creates responder that overrides all the requests that start with the given request path
293    /// using the given responder.
294    pub fn new(suffix: impl Into<PathBuf>, responder: H) -> Self {
295        Self { suffix: suffix.into(), responder }
296    }
297}
298/// Responder that passes responses through the given responder once per unique path.
299pub struct OncePerPath<H: HttpResponder> {
300    responder: H,
301    failed_paths: Mutex<HashSet<PathBuf>>,
302}
303
304impl<H: HttpResponder> HttpResponder for OncePerPath<H> {
305    fn respond<'a>(
306        &'a self,
307        request: &'a Request<Body>,
308        response: Response<Body>,
309    ) -> BoxFuture<'_, Response<Body>> {
310        if self.failed_paths.lock().insert(request.path().to_owned()) {
311            self.responder.respond(request, response)
312        } else {
313            ready(response).boxed()
314        }
315    }
316}
317
318impl<H: HttpResponder> OncePerPath<H> {
319    /// Creates responder that passes responses through the given responder once per unique path.
320    pub fn new(responder: H) -> Self {
321        Self { responder, failed_paths: Mutex::new(HashSet::new()) }
322    }
323}
324
325/// Transform a `serde_json::Value`. Implements `HttpResponder` by assuming the `Response<Body>` is
326/// json-formatted.
327pub trait JsonTransformer: Send + Sync + Clone + 'static {
328    /// Transform a `serde_json::Value`
329    fn transform(&self, v: serde_json::Value) -> serde_json::Value;
330}
331
332impl<F> JsonTransformer for F
333where
334    F: Fn(serde_json::Value) -> serde_json::Value + Send + Sync + Clone + 'static,
335{
336    fn transform(&self, v: serde_json::Value) -> serde_json::Value {
337        (self)(v)
338    }
339}
340
341/// Responder that manipulates requests with json-formatted bodies.
342impl<T: JsonTransformer> HttpResponder for T {
343    fn respond(
344        &self,
345        _: &Request<Body>,
346        response: Response<Body>,
347    ) -> BoxFuture<'_, Response<Body>> {
348        async move {
349            let (mut parts, body) = response.into_parts();
350            parts.headers.remove(http::header::CONTENT_LENGTH);
351            let bytes = body_to_bytes(body).await;
352            let value = self.transform(serde_json::from_reader(bytes.as_slice()).unwrap());
353            let bytes = serde_json::to_vec(&value).unwrap();
354            Response::from_parts(parts, Body::from(bytes))
355        }
356        .boxed()
357    }
358}
359
360/// Responder that notifies a channel when it receives a request.
361pub struct NotifyWhenRequested {
362    notify: mpsc::UnboundedSender<()>,
363}
364
365impl NotifyWhenRequested {
366    /// Creates a new responder and the receiver it notifies on request receipt.
367    pub fn new() -> (Self, mpsc::UnboundedReceiver<()>) {
368        let (tx, rx) = mpsc::unbounded();
369        (Self { notify: tx }, rx)
370    }
371}
372
373impl HttpResponder for NotifyWhenRequested {
374    fn respond(
375        &self,
376        _: &Request<Body>,
377        response: Response<Body>,
378    ) -> BoxFuture<'_, Response<Body>> {
379        self.notify.unbounded_send(()).unwrap();
380        ready(response).boxed()
381    }
382}
383
384/// A response that is waiting to be sent.
385pub struct BlockedResponse {
386    path: PathBuf,
387    unblocker: oneshot::Sender<()>,
388}
389
390impl BlockedResponse {
391    /// The path of the request.
392    pub fn path(&self) -> &Path {
393        &self.path
394    }
395
396    /// Send the response.
397    pub fn unblock(self) {
398        self.unblocker.send(()).expect("request to still be pending")
399    }
400}
401
402/// Responder that blocks sending response headers and bodies until unblocked by a test.
403pub struct BlockResponseHeaders {
404    blocked_responses: mpsc::UnboundedSender<BlockedResponse>,
405}
406
407impl BlockResponseHeaders {
408    /// Creates a new responder and the receiver it notifies on request receipt.
409    pub fn new() -> (Self, mpsc::UnboundedReceiver<BlockedResponse>) {
410        let (sender, receiver) = mpsc::unbounded();
411
412        (Self { blocked_responses: sender }, receiver)
413    }
414}
415
416impl HttpResponder for BlockResponseHeaders {
417    fn respond(
418        &self,
419        request: &Request<Body>,
420        response: Response<Body>,
421    ) -> BoxFuture<'_, Response<Body>> {
422        // Return a future that notifies the test that the request was blocked and wait for it to
423        // unblock the response.
424        let path = request.path().to_owned();
425        let mut blocked_responses = self.blocked_responses.clone();
426        async move {
427            let (unblocker, waiter) = oneshot::channel();
428            blocked_responses
429                .send(BlockedResponse { path, unblocker })
430                .await
431                .expect("receiver to still exist");
432            waiter.await.expect("request to be unblocked");
433            response
434        }
435        .boxed()
436    }
437}
438
439/// Responder that blocks sending response body until unblocked by a test.
440/// Panics if requested more than once.
441pub struct BlockResponseBodyOnce {
442    #[allow(clippy::type_complexity)]
443    notify: Mutex<Option<oneshot::Sender<Box<dyn FnOnce() + Send>>>>,
444}
445
446impl BlockResponseBodyOnce {
447    /// Creates a new responder and the receiver it notifies after sending the response headers.
448    pub fn new() -> (Self, oneshot::Receiver<Box<dyn FnOnce() + Send>>) {
449        let (sender, receiver) = oneshot::channel();
450
451        (Self { notify: Mutex::new(Some(sender)) }, receiver)
452    }
453}
454
455impl HttpResponder for BlockResponseBodyOnce {
456    fn respond(
457        &self,
458        _: &Request<Body>,
459        mut response: Response<Body>,
460    ) -> BoxFuture<'_, Response<Body>> {
461        let notify = self.notify.lock().take().expect("a single request for this path");
462
463        async move {
464            // Replace the response's body with a stream that will yield data when the test
465            // unblocks the response body.
466            let (mut sender, new_body) = Body::channel();
467            let old_body = std::mem::replace(response.body_mut(), new_body);
468            let contents = body_to_bytes(old_body).await;
469
470            // Notify the test.
471            notify
472                .send(Box::new(move || {
473                    sender.try_send_data(contents.into()).expect("sending body")
474                }))
475                .map_err(|_| ())
476                .expect("receiver to still exist");
477
478            // Yield the modified response so hyper will send the headers and wait for the body to
479            // be unblocked.
480            response
481        }
482        .boxed()
483    }
484}
485
486async fn body_to_bytes(body: Body) -> Vec<u8> {
487    hyper::body::to_bytes(body).await.expect("body to bytes").to_vec()
488}
489
490/// Responder that yields the response up to the final byte, then produces an error.  Panics if the
491/// response contains an empty body.
492pub struct OneByteShortThenError;
493
494impl HttpResponder for OneByteShortThenError {
495    fn respond(
496        &self,
497        _: &Request<Body>,
498        response: Response<Body>,
499    ) -> BoxFuture<'_, Response<Body>> {
500        async {
501            let (parts, body) = response.into_parts();
502            let mut bytes = body_to_bytes(body).await;
503            if bytes.pop().is_none() {
504                panic!("can't short 0 bytes");
505            }
506            Response::from_parts(
507                parts,
508                Body::wrap_stream(futures::stream::iter(vec![
509                    Ok(bytes),
510                    Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
511                ])),
512            )
513        }
514        .boxed()
515    }
516}
517
518/// Responder that yields the response up to the Nth byte, then produces an error.  Panics if the
519/// response does not contain more than N bytes.
520pub struct NBytesThenError {
521    n: usize,
522}
523
524impl NBytesThenError {
525    /// Make a responder that returns N bytes then errors.
526    pub fn new(n: usize) -> Self {
527        Self { n }
528    }
529}
530impl HttpResponder for NBytesThenError {
531    fn respond(
532        &self,
533        _: &Request<Body>,
534        response: Response<Body>,
535    ) -> BoxFuture<'_, Response<Body>> {
536        let n = self.n;
537        async move {
538            let (parts, body) = response.into_parts();
539            let mut bytes = body_to_bytes(body).await;
540            let initial_len = bytes.len();
541            if initial_len <= n {
542                panic!("not enough bytes to shorten, {initial_len} {n}");
543            }
544            bytes.truncate(n);
545            Response::from_parts(
546                parts,
547                Body::wrap_stream(futures::stream::iter(vec![
548                    Ok(bytes),
549                    Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
550                ])),
551            )
552        }
553        .boxed()
554    }
555}
556
557/// Responder that yields the response up to the final byte, then disconnects.  Panics if the
558/// response contains an empty body.
559pub struct OneByteShortThenDisconnect;
560
561impl HttpResponder for OneByteShortThenDisconnect {
562    fn respond(
563        &self,
564        _: &Request<Body>,
565        response: Response<Body>,
566    ) -> BoxFuture<'_, Response<Body>> {
567        async {
568            let (parts, body) = response.into_parts();
569            let mut bytes = body_to_bytes(body).await;
570            if bytes.pop().is_none() {
571                panic!("can't short 0 bytes");
572            }
573            Response::from_parts(
574                parts,
575                Body::wrap_stream(futures::stream::iter(vec![Result::<Vec<u8>, String>::Ok(
576                    bytes,
577                )])),
578            )
579        }
580        .boxed()
581    }
582}
583
584/// Responder that flips the first byte of the response.  Panics if the response contains an empty
585/// body.
586pub struct OneByteFlipped;
587
588impl HttpResponder for OneByteFlipped {
589    fn respond(
590        &self,
591        _: &Request<Body>,
592        response: Response<Body>,
593    ) -> BoxFuture<'_, Response<Body>> {
594        async {
595            let (parts, body) = response.into_parts();
596            let mut bytes = body_to_bytes(body).await;
597            if bytes.is_empty() {
598                panic!("can't flip 0 bytes");
599            }
600            bytes[0] = !bytes[0];
601            Response::from_parts(parts, Body::from(bytes))
602        }
603        .boxed()
604    }
605}
606
607/// Responder that never sends bytes.
608pub struct Hang;
609
610impl HttpResponder for Hang {
611    fn respond(&self, _: &Request<Body>, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
612        pending().boxed()
613    }
614}
615
616/// Responder that sends the header but then never sends body bytes.
617pub struct HangBody;
618
619impl HttpResponder for HangBody {
620    fn respond(
621        &self,
622        _: &Request<Body>,
623        response: Response<Body>,
624    ) -> BoxFuture<'_, Response<Body>> {
625        async {
626            let (parts, _) = response.into_parts();
627            Response::from_parts(
628                parts,
629                Body::wrap_stream(futures::stream::pending::<Result<Vec<u8>, String>>()),
630            )
631        }
632        .boxed()
633    }
634}
635
636/// Responder that forwards to its wrapped responder once.
637pub struct Once<H: HttpResponder> {
638    already_forwarded: AtomicBool,
639    responder: H,
640}
641
642impl<H: HttpResponder> HttpResponder for Once<H> {
643    fn respond<'a>(
644        &'a self,
645        request: &'a Request<Body>,
646        response: Response<Body>,
647    ) -> BoxFuture<'_, Response<Body>> {
648        if self.already_forwarded.fetch_or(true, Ordering::SeqCst) {
649            ready(response).boxed()
650        } else {
651            self.responder.respond(request, response)
652        }
653    }
654}
655
656impl<H: HttpResponder> Once<H> {
657    /// Creates a responder that forwards to `responder` once.
658    pub fn new(responder: H) -> Self {
659        Self { already_forwarded: AtomicBool::new(false), responder }
660    }
661}
662
663/// Responder that forwards to its wrapped responder the nth time it is called.
664pub struct OverrideNth<H: HttpResponder> {
665    n: u32,
666    call_count: AtomicU32,
667    responder: H,
668}
669
670impl<H: HttpResponder> HttpResponder for OverrideNth<H> {
671    fn respond<'a>(
672        &'a self,
673        request: &'a Request<Body>,
674        response: Response<Body>,
675    ) -> BoxFuture<'_, Response<Body>> {
676        if self.call_count.fetch_add(1, Ordering::SeqCst) + 1 == self.n {
677            self.responder.respond(request, response)
678        } else {
679            ready(response).boxed()
680        }
681    }
682}
683
684impl<H: HttpResponder> OverrideNth<H> {
685    /// Creates a responder that forwards to `responder` on the nth call.
686    pub fn new(n: u32, responder: H) -> Self {
687        Self { n, call_count: AtomicU32::new(0), responder }
688    }
689}
690
691/// Information saved by Record for each request it handles.
692#[derive(Debug)]
693pub struct HistoryEntry {
694    uri_path: PathBuf,
695    headers: hyper::HeaderMap<hyper::header::HeaderValue>,
696}
697
698impl HistoryEntry {
699    /// The uri_path of the request.
700    pub fn uri_path(&self) -> &Path {
701        &self.uri_path
702    }
703
704    /// The request headers.
705    pub fn headers(&self) -> &http::HeaderMap<hyper::header::HeaderValue> {
706        &self.headers
707    }
708}
709
710/// The request history recorded by Record.
711pub struct History(Arc<Mutex<Vec<HistoryEntry>>>);
712
713impl History {
714    /// Take the recorded history, clearing it from the Record.
715    pub fn take(&self) -> Vec<HistoryEntry> {
716        std::mem::take(&mut self.0.lock())
717    }
718}
719
720/// Responder that records the requests.
721pub struct Record {
722    history: History,
723}
724
725impl Record {
726    /// Creates a responder that records all the requests.
727    pub fn new() -> (Self, History) {
728        let history = Arc::new(Mutex::new(vec![]));
729        (Self { history: History(Arc::clone(&history)) }, History(history))
730    }
731}
732
733impl HttpResponder for Record {
734    fn respond<'a>(
735        &'a self,
736        request: &'a Request<Body>,
737        response: Response<Body>,
738    ) -> BoxFuture<'_, Response<Body>> {
739        self.history.0.lock().push(HistoryEntry {
740            uri_path: request.path().to_owned(),
741            headers: request.headers().clone(),
742        });
743        ready(response).boxed()
744    }
745}
746
747/// Responder that forwards requests to its wrapped Responder if filter returns true.
748pub struct Filter<F: FilterFn, T: HttpResponder> {
749    filter: F,
750    handler: T,
751}
752
753/// Used by the Filter HttpResponder to decide which requests to forward and which to ignore.
754pub trait FilterFn: Send + Sync + 'static {
755    /// Return true iff Filter should forward the request to its wrapped Responder.
756    fn filter(&self, request: &Request<Body>) -> bool;
757}
758
759impl<F> FilterFn for F
760where
761    F: Fn(&Request<Body>) -> bool + Send + Sync + 'static,
762{
763    fn filter(&self, request: &Request<Body>) -> bool {
764        (self)(request)
765    }
766}
767
768/// Returns true iff the request has a Content-Range header.
769pub fn is_range_request(request: &Request<Body>) -> bool {
770    request.headers().get(http::header::RANGE).is_some()
771}
772
773impl<F: FilterFn, T: HttpResponder> Filter<F, T> {
774    /// Creates a responder that forwards requests that satisfy a filter.
775    pub fn new(filter: F, handler: T) -> Self {
776        Self { filter, handler }
777    }
778}
779
780impl<F: FilterFn, T: HttpResponder> HttpResponder for Filter<F, T> {
781    fn respond<'a>(
782        &'a self,
783        request: &'a Request<Body>,
784        response: Response<Body>,
785    ) -> BoxFuture<'_, Response<Body>> {
786        if self.filter.filter(request) {
787            self.handler.respond(request, response)
788        } else {
789            ready(response).boxed()
790        }
791    }
792}
793
794/// Responder that changes the status code to a given value.
795pub struct OverwriteStatusCode {
796    code: http::StatusCode,
797}
798
799impl OverwriteStatusCode {
800    /// Creates a responder that changes the status code to a given value.
801    pub fn new(code: http::StatusCode) -> Self {
802        Self { code }
803    }
804}
805
806impl HttpResponder for OverwriteStatusCode {
807    fn respond(
808        &self,
809        _: &Request<Body>,
810        mut response: Response<Body>,
811    ) -> BoxFuture<'_, Response<Body>> {
812        *response.status_mut() = self.code;
813        futures::future::ready(response).boxed()
814    }
815}
816
817/// Responder that calls each wrapped responder in order.
818pub struct Chain {
819    responders: Vec<Box<dyn HttpResponder>>,
820}
821
822impl Chain {
823    /// Creates a responder that calls each wrapped responder in order.
824    pub fn new(responders: Vec<Box<dyn HttpResponder>>) -> Self {
825        Self { responders }
826    }
827}
828
829impl HttpResponder for Chain {
830    fn respond<'a>(
831        &'a self,
832        request: &'a Request<Body>,
833        mut response: Response<Body>,
834    ) -> BoxFuture<'a, Response<Body>> {
835        async move {
836            for responder in self.responders.iter() {
837                response = responder.respond(request, response).await;
838            }
839            response
840        }
841        .boxed()
842    }
843}
844
845/// Fails all requests with NOT_FOUND.
846///
847/// All requests made to the first requested path are failed immediately.
848/// All requests to subsequent paths are blocked until the `unblocker` returned by
849///   new() is used, at which point all requests (pending and future) fail immediately.
850pub struct FailOneThenTemporarilyBlock {
851    path_to_fail: Arc<Mutex<Option<PathBuf>>>,
852    block_until: Shared<oneshot::Receiver<()>>,
853}
854
855impl FailOneThenTemporarilyBlock {
856    /// Create a FailOneThenTemporarilyBlock and its paired unblocker.
857    pub fn new() -> (Self, oneshot::Sender<()>) {
858        let (send, recv) = oneshot::channel();
859        (Self { path_to_fail: Arc::new(Mutex::new(None)), block_until: recv.shared() }, send)
860    }
861}
862
863impl HttpResponder for FailOneThenTemporarilyBlock {
864    fn respond(&self, request: &Request<Body>, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
865        let response =
866            Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap();
867        match &mut *self.path_to_fail.lock() {
868            o @ None => {
869                *o = Some(request.path().to_owned());
870                ready(response).boxed()
871            }
872            Some(path_to_fail) if path_to_fail == request.path() => ready(response).boxed(),
873            _ => {
874                let block_until = self.block_until.clone();
875                async move {
876                    block_until.await.unwrap();
877                    response
878                }
879            }
880            .boxed(),
881        }
882    }
883}