1use crate::serve::HttpResponder;
8use fuchsia_sync::Mutex;
9use futures::channel::{mpsc, oneshot};
10use futures::future::{BoxFuture, Shared, pending, ready};
11use futures::prelude::*;
12use hyper::{Body, Request, Response, StatusCode};
13use std::collections::HashSet;
14use std::path::{Path, PathBuf};
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, AtomicU16, AtomicU32, Ordering};
17
18pub trait RequestExt {
20 fn path(&self) -> &Path;
22}
23
24impl RequestExt for Request<Body> {
25 fn path(&self) -> &Path {
27 Path::new(self.uri().path())
28 }
29}
30
31pub struct StaticResponseCode(StatusCode);
33
34impl HttpResponder for StaticResponseCode {
35 fn respond(&self, _: &Request<Body>, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
36 ready(Response::builder().status(self.0).body(Body::empty()).unwrap()).boxed()
37 }
38}
39
40impl StaticResponseCode {
41 pub fn new(status: StatusCode) -> Self {
43 Self(status)
44 }
45
46 pub fn ok() -> Self {
48 Self(StatusCode::OK)
49 }
50
51 pub fn not_found() -> Self {
53 Self(StatusCode::NOT_FOUND)
54 }
55
56 pub fn server_error() -> Self {
58 Self(StatusCode::INTERNAL_SERVER_ERROR)
59 }
60
61 pub fn too_many_requests() -> Self {
63 Self(StatusCode::TOO_MANY_REQUESTS)
64 }
65}
66
67#[derive(Debug, Default)]
69pub struct DynamicResponseSetter(Arc<AtomicU16>);
70
71impl DynamicResponseSetter {
72 pub fn set(&self, code: u16) {
74 self.0.store(code, Ordering::SeqCst);
75 }
76}
77
78pub struct DynamicResponseCode {
80 code: Arc<AtomicU16>,
81}
82
83impl HttpResponder for DynamicResponseCode {
84 fn respond<'a>(
85 &'a self,
86 _: &'a Request<Body>,
87 _: Response<Body>,
88 ) -> BoxFuture<'_, Response<Body>> {
89 ready(
90 Response::builder()
91 .status(self.code.load(Ordering::SeqCst))
92 .body(Body::empty())
93 .unwrap(),
94 )
95 .boxed()
96 }
97}
98
99impl DynamicResponseCode {
100 pub fn new(initial: u16) -> (Self, DynamicResponseSetter) {
102 let setter = DynamicResponseSetter(Arc::new(initial.into()));
103 (Self { code: Arc::clone(&setter.0) }, setter)
104 }
105}
106
107#[derive(Debug, Default)]
109pub struct AtomicToggle(Arc<AtomicBool>);
110
111impl AtomicToggle {
112 pub fn new(initial: bool) -> Self {
114 Self(Arc::new(initial.into()))
115 }
116
117 pub fn set(&self) {
119 self.0.store(true, Ordering::SeqCst);
120 }
121
122 pub fn unset(&self) {
124 self.0.store(false, Ordering::SeqCst);
125 }
126}
127
128pub struct Toggleable<H: HttpResponder> {
130 enabled: Arc<AtomicBool>,
131 responder: H,
132}
133
134impl<H: HttpResponder> HttpResponder for Toggleable<H> {
135 fn respond<'a>(
136 &'a self,
137 request: &'a Request<Body>,
138 response: Response<Body>,
139 ) -> BoxFuture<'_, Response<Body>> {
140 if self.enabled.load(Ordering::SeqCst) {
141 self.responder.respond(request, response)
142 } else {
143 ready(response).boxed()
144 }
145 }
146}
147
148impl<H: HttpResponder> Toggleable<H> {
149 pub fn new(should_override: &AtomicToggle, responder: H) -> Self {
151 Self { enabled: Arc::clone(&should_override.0), responder }
152 }
153}
154
155pub struct ForRequestCount<H: HttpResponder> {
157 remaining: Mutex<u32>,
158 responder: H,
159}
160
161impl<H: HttpResponder> HttpResponder for ForRequestCount<H> {
162 fn respond<'a>(
163 &'a self,
164 request: &'a Request<Body>,
165 response: Response<Body>,
166 ) -> BoxFuture<'_, Response<Body>> {
167 let mut remaining = self.remaining.lock();
168 if *remaining > 0 {
169 *remaining -= 1;
170 drop(remaining);
171 self.responder.respond(request, response)
172 } else {
173 ready(response).boxed()
174 }
175 }
176}
177
178impl<H: HttpResponder> ForRequestCount<H> {
179 pub fn new(count: u32, responder: H) -> Self {
181 Self { remaining: Mutex::new(count), responder }
182 }
183}
184
185pub struct ForPath<H: HttpResponder> {
187 path: PathBuf,
188 responder: H,
189}
190
191impl<H: HttpResponder> HttpResponder for ForPath<H> {
192 fn respond<'a>(
193 &'a self,
194 request: &'a Request<Body>,
195 response: Response<Body>,
196 ) -> BoxFuture<'_, Response<Body>> {
197 if self.path == request.path() {
198 self.responder.respond(request, response)
199 } else {
200 ready(response).boxed()
201 }
202 }
203}
204
205impl<H: HttpResponder> ForPath<H> {
206 pub fn new(path: impl Into<PathBuf>, responder: H) -> Self {
208 Self { path: path.into(), responder }
209 }
210}
211
212pub struct ForPaths<H: HttpResponder> {
214 paths: HashSet<PathBuf>,
215 responder: H,
216}
217
218impl<H: HttpResponder> HttpResponder for ForPaths<H> {
219 fn respond<'a>(
220 &'a self,
221 request: &'a Request<Body>,
222 response: Response<Body>,
223 ) -> BoxFuture<'_, Response<Body>> {
224 if self.paths.contains(request.path()) {
225 self.responder.respond(request, response)
226 } else {
227 ready(response).boxed()
228 }
229 }
230}
231
232impl<H: HttpResponder> ForPaths<H> {
233 pub fn new(paths: HashSet<PathBuf>, responder: H) -> Self {
235 Self { paths, responder }
236 }
237}
238
239pub struct ForPathPrefix<H: HttpResponder> {
242 prefix: PathBuf,
243 responder: H,
244}
245
246impl<H: HttpResponder> HttpResponder for ForPathPrefix<H> {
247 fn respond<'a>(
248 &'a self,
249 request: &'a Request<Body>,
250 response: Response<Body>,
251 ) -> BoxFuture<'_, Response<Body>> {
252 if request.path().starts_with(&self.prefix) {
253 self.responder.respond(request, response)
254 } else {
255 ready(response).boxed()
256 }
257 }
258}
259
260impl<H: HttpResponder> ForPathPrefix<H> {
261 pub fn new(prefix: impl Into<PathBuf>, responder: H) -> Self {
264 Self { prefix: prefix.into(), responder }
265 }
266}
267
268pub struct ForPathSuffix<H: HttpResponder> {
273 suffix: PathBuf,
274 responder: H,
275}
276
277impl<H: HttpResponder> HttpResponder for ForPathSuffix<H> {
278 fn respond<'a>(
279 &'a self,
280 request: &'a Request<Body>,
281 response: Response<Body>,
282 ) -> BoxFuture<'_, Response<Body>> {
283 if request.path().ends_with(&self.suffix) {
284 self.responder.respond(request, response)
285 } else {
286 ready(response).boxed()
287 }
288 }
289}
290
291impl<H: HttpResponder> ForPathSuffix<H> {
292 pub fn new(suffix: impl Into<PathBuf>, responder: H) -> Self {
295 Self { suffix: suffix.into(), responder }
296 }
297}
298pub struct OncePerPath<H: HttpResponder> {
300 responder: H,
301 failed_paths: Mutex<HashSet<PathBuf>>,
302}
303
304impl<H: HttpResponder> HttpResponder for OncePerPath<H> {
305 fn respond<'a>(
306 &'a self,
307 request: &'a Request<Body>,
308 response: Response<Body>,
309 ) -> BoxFuture<'_, Response<Body>> {
310 if self.failed_paths.lock().insert(request.path().to_owned()) {
311 self.responder.respond(request, response)
312 } else {
313 ready(response).boxed()
314 }
315 }
316}
317
318impl<H: HttpResponder> OncePerPath<H> {
319 pub fn new(responder: H) -> Self {
321 Self { responder, failed_paths: Mutex::new(HashSet::new()) }
322 }
323}
324
325pub trait JsonTransformer: Send + Sync + Clone + 'static {
328 fn transform(&self, v: serde_json::Value) -> serde_json::Value;
330}
331
332impl<F> JsonTransformer for F
333where
334 F: Fn(serde_json::Value) -> serde_json::Value + Send + Sync + Clone + 'static,
335{
336 fn transform(&self, v: serde_json::Value) -> serde_json::Value {
337 (self)(v)
338 }
339}
340
341impl<T: JsonTransformer> HttpResponder for T {
343 fn respond(
344 &self,
345 _: &Request<Body>,
346 response: Response<Body>,
347 ) -> BoxFuture<'_, Response<Body>> {
348 async move {
349 let (mut parts, body) = response.into_parts();
350 parts.headers.remove(http::header::CONTENT_LENGTH);
351 let bytes = body_to_bytes(body).await;
352 let value = self.transform(serde_json::from_reader(bytes.as_slice()).unwrap());
353 let bytes = serde_json::to_vec(&value).unwrap();
354 Response::from_parts(parts, Body::from(bytes))
355 }
356 .boxed()
357 }
358}
359
360pub struct NotifyWhenRequested {
362 notify: mpsc::UnboundedSender<()>,
363}
364
365impl NotifyWhenRequested {
366 pub fn new() -> (Self, mpsc::UnboundedReceiver<()>) {
368 let (tx, rx) = mpsc::unbounded();
369 (Self { notify: tx }, rx)
370 }
371}
372
373impl HttpResponder for NotifyWhenRequested {
374 fn respond(
375 &self,
376 _: &Request<Body>,
377 response: Response<Body>,
378 ) -> BoxFuture<'_, Response<Body>> {
379 self.notify.unbounded_send(()).unwrap();
380 ready(response).boxed()
381 }
382}
383
384pub struct BlockedResponse {
386 path: PathBuf,
387 unblocker: oneshot::Sender<()>,
388}
389
390impl BlockedResponse {
391 pub fn path(&self) -> &Path {
393 &self.path
394 }
395
396 pub fn unblock(self) {
398 self.unblocker.send(()).expect("request to still be pending")
399 }
400}
401
402pub struct BlockResponseHeaders {
404 blocked_responses: mpsc::UnboundedSender<BlockedResponse>,
405}
406
407impl BlockResponseHeaders {
408 pub fn new() -> (Self, mpsc::UnboundedReceiver<BlockedResponse>) {
410 let (sender, receiver) = mpsc::unbounded();
411
412 (Self { blocked_responses: sender }, receiver)
413 }
414}
415
416impl HttpResponder for BlockResponseHeaders {
417 fn respond(
418 &self,
419 request: &Request<Body>,
420 response: Response<Body>,
421 ) -> BoxFuture<'_, Response<Body>> {
422 let path = request.path().to_owned();
425 let mut blocked_responses = self.blocked_responses.clone();
426 async move {
427 let (unblocker, waiter) = oneshot::channel();
428 blocked_responses
429 .send(BlockedResponse { path, unblocker })
430 .await
431 .expect("receiver to still exist");
432 waiter.await.expect("request to be unblocked");
433 response
434 }
435 .boxed()
436 }
437}
438
439pub struct BlockResponseBodyOnce {
442 #[allow(clippy::type_complexity)]
443 notify: Mutex<Option<oneshot::Sender<Box<dyn FnOnce() + Send>>>>,
444}
445
446impl BlockResponseBodyOnce {
447 pub fn new() -> (Self, oneshot::Receiver<Box<dyn FnOnce() + Send>>) {
449 let (sender, receiver) = oneshot::channel();
450
451 (Self { notify: Mutex::new(Some(sender)) }, receiver)
452 }
453}
454
455impl HttpResponder for BlockResponseBodyOnce {
456 fn respond(
457 &self,
458 _: &Request<Body>,
459 mut response: Response<Body>,
460 ) -> BoxFuture<'_, Response<Body>> {
461 let notify = self.notify.lock().take().expect("a single request for this path");
462
463 async move {
464 let (mut sender, new_body) = Body::channel();
467 let old_body = std::mem::replace(response.body_mut(), new_body);
468 let contents = body_to_bytes(old_body).await;
469
470 notify
472 .send(Box::new(move || {
473 sender.try_send_data(contents.into()).expect("sending body")
474 }))
475 .map_err(|_| ())
476 .expect("receiver to still exist");
477
478 response
481 }
482 .boxed()
483 }
484}
485
486async fn body_to_bytes(body: Body) -> Vec<u8> {
487 hyper::body::to_bytes(body).await.expect("body to bytes").to_vec()
488}
489
490pub struct OneByteShortThenError;
493
494impl HttpResponder for OneByteShortThenError {
495 fn respond(
496 &self,
497 _: &Request<Body>,
498 response: Response<Body>,
499 ) -> BoxFuture<'_, Response<Body>> {
500 async {
501 let (parts, body) = response.into_parts();
502 let mut bytes = body_to_bytes(body).await;
503 if bytes.pop().is_none() {
504 panic!("can't short 0 bytes");
505 }
506 Response::from_parts(
507 parts,
508 Body::wrap_stream(futures::stream::iter(vec![
509 Ok(bytes),
510 Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
511 ])),
512 )
513 }
514 .boxed()
515 }
516}
517
518pub struct NBytesThenError {
521 n: usize,
522}
523
524impl NBytesThenError {
525 pub fn new(n: usize) -> Self {
527 Self { n }
528 }
529}
530impl HttpResponder for NBytesThenError {
531 fn respond(
532 &self,
533 _: &Request<Body>,
534 response: Response<Body>,
535 ) -> BoxFuture<'_, Response<Body>> {
536 let n = self.n;
537 async move {
538 let (parts, body) = response.into_parts();
539 let mut bytes = body_to_bytes(body).await;
540 let initial_len = bytes.len();
541 if initial_len <= n {
542 panic!("not enough bytes to shorten, {initial_len} {n}");
543 }
544 bytes.truncate(n);
545 Response::from_parts(
546 parts,
547 Body::wrap_stream(futures::stream::iter(vec![
548 Ok(bytes),
549 Err("all_but_one_byte_then_eror has sent all but one bytes".to_string()),
550 ])),
551 )
552 }
553 .boxed()
554 }
555}
556
557pub struct OneByteShortThenDisconnect;
560
561impl HttpResponder for OneByteShortThenDisconnect {
562 fn respond(
563 &self,
564 _: &Request<Body>,
565 response: Response<Body>,
566 ) -> BoxFuture<'_, Response<Body>> {
567 async {
568 let (parts, body) = response.into_parts();
569 let mut bytes = body_to_bytes(body).await;
570 if bytes.pop().is_none() {
571 panic!("can't short 0 bytes");
572 }
573 Response::from_parts(
574 parts,
575 Body::wrap_stream(futures::stream::iter(vec![Result::<Vec<u8>, String>::Ok(
576 bytes,
577 )])),
578 )
579 }
580 .boxed()
581 }
582}
583
584pub struct OneByteFlipped;
587
588impl HttpResponder for OneByteFlipped {
589 fn respond(
590 &self,
591 _: &Request<Body>,
592 response: Response<Body>,
593 ) -> BoxFuture<'_, Response<Body>> {
594 async {
595 let (parts, body) = response.into_parts();
596 let mut bytes = body_to_bytes(body).await;
597 if bytes.is_empty() {
598 panic!("can't flip 0 bytes");
599 }
600 bytes[0] = !bytes[0];
601 Response::from_parts(parts, Body::from(bytes))
602 }
603 .boxed()
604 }
605}
606
607pub struct Hang;
609
610impl HttpResponder for Hang {
611 fn respond(&self, _: &Request<Body>, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
612 pending().boxed()
613 }
614}
615
616pub struct HangBody;
618
619impl HttpResponder for HangBody {
620 fn respond(
621 &self,
622 _: &Request<Body>,
623 response: Response<Body>,
624 ) -> BoxFuture<'_, Response<Body>> {
625 async {
626 let (parts, _) = response.into_parts();
627 Response::from_parts(
628 parts,
629 Body::wrap_stream(futures::stream::pending::<Result<Vec<u8>, String>>()),
630 )
631 }
632 .boxed()
633 }
634}
635
636pub struct Once<H: HttpResponder> {
638 already_forwarded: AtomicBool,
639 responder: H,
640}
641
642impl<H: HttpResponder> HttpResponder for Once<H> {
643 fn respond<'a>(
644 &'a self,
645 request: &'a Request<Body>,
646 response: Response<Body>,
647 ) -> BoxFuture<'_, Response<Body>> {
648 if self.already_forwarded.fetch_or(true, Ordering::SeqCst) {
649 ready(response).boxed()
650 } else {
651 self.responder.respond(request, response)
652 }
653 }
654}
655
656impl<H: HttpResponder> Once<H> {
657 pub fn new(responder: H) -> Self {
659 Self { already_forwarded: AtomicBool::new(false), responder }
660 }
661}
662
663pub struct OverrideNth<H: HttpResponder> {
665 n: u32,
666 call_count: AtomicU32,
667 responder: H,
668}
669
670impl<H: HttpResponder> HttpResponder for OverrideNth<H> {
671 fn respond<'a>(
672 &'a self,
673 request: &'a Request<Body>,
674 response: Response<Body>,
675 ) -> BoxFuture<'_, Response<Body>> {
676 if self.call_count.fetch_add(1, Ordering::SeqCst) + 1 == self.n {
677 self.responder.respond(request, response)
678 } else {
679 ready(response).boxed()
680 }
681 }
682}
683
684impl<H: HttpResponder> OverrideNth<H> {
685 pub fn new(n: u32, responder: H) -> Self {
687 Self { n, call_count: AtomicU32::new(0), responder }
688 }
689}
690
691#[derive(Debug)]
693pub struct HistoryEntry {
694 uri_path: PathBuf,
695 headers: hyper::HeaderMap<hyper::header::HeaderValue>,
696}
697
698impl HistoryEntry {
699 pub fn uri_path(&self) -> &Path {
701 &self.uri_path
702 }
703
704 pub fn headers(&self) -> &http::HeaderMap<hyper::header::HeaderValue> {
706 &self.headers
707 }
708}
709
710pub struct History(Arc<Mutex<Vec<HistoryEntry>>>);
712
713impl History {
714 pub fn take(&self) -> Vec<HistoryEntry> {
716 std::mem::take(&mut self.0.lock())
717 }
718}
719
720pub struct Record {
722 history: History,
723}
724
725impl Record {
726 pub fn new() -> (Self, History) {
728 let history = Arc::new(Mutex::new(vec![]));
729 (Self { history: History(Arc::clone(&history)) }, History(history))
730 }
731}
732
733impl HttpResponder for Record {
734 fn respond<'a>(
735 &'a self,
736 request: &'a Request<Body>,
737 response: Response<Body>,
738 ) -> BoxFuture<'_, Response<Body>> {
739 self.history.0.lock().push(HistoryEntry {
740 uri_path: request.path().to_owned(),
741 headers: request.headers().clone(),
742 });
743 ready(response).boxed()
744 }
745}
746
747pub struct Filter<F: FilterFn, T: HttpResponder> {
749 filter: F,
750 handler: T,
751}
752
753pub trait FilterFn: Send + Sync + 'static {
755 fn filter(&self, request: &Request<Body>) -> bool;
757}
758
759impl<F> FilterFn for F
760where
761 F: Fn(&Request<Body>) -> bool + Send + Sync + 'static,
762{
763 fn filter(&self, request: &Request<Body>) -> bool {
764 (self)(request)
765 }
766}
767
768pub fn is_range_request(request: &Request<Body>) -> bool {
770 request.headers().get(http::header::RANGE).is_some()
771}
772
773impl<F: FilterFn, T: HttpResponder> Filter<F, T> {
774 pub fn new(filter: F, handler: T) -> Self {
776 Self { filter, handler }
777 }
778}
779
780impl<F: FilterFn, T: HttpResponder> HttpResponder for Filter<F, T> {
781 fn respond<'a>(
782 &'a self,
783 request: &'a Request<Body>,
784 response: Response<Body>,
785 ) -> BoxFuture<'_, Response<Body>> {
786 if self.filter.filter(request) {
787 self.handler.respond(request, response)
788 } else {
789 ready(response).boxed()
790 }
791 }
792}
793
794pub struct OverwriteStatusCode {
796 code: http::StatusCode,
797}
798
799impl OverwriteStatusCode {
800 pub fn new(code: http::StatusCode) -> Self {
802 Self { code }
803 }
804}
805
806impl HttpResponder for OverwriteStatusCode {
807 fn respond(
808 &self,
809 _: &Request<Body>,
810 mut response: Response<Body>,
811 ) -> BoxFuture<'_, Response<Body>> {
812 *response.status_mut() = self.code;
813 futures::future::ready(response).boxed()
814 }
815}
816
817pub struct Chain {
819 responders: Vec<Box<dyn HttpResponder>>,
820}
821
822impl Chain {
823 pub fn new(responders: Vec<Box<dyn HttpResponder>>) -> Self {
825 Self { responders }
826 }
827}
828
829impl HttpResponder for Chain {
830 fn respond<'a>(
831 &'a self,
832 request: &'a Request<Body>,
833 mut response: Response<Body>,
834 ) -> BoxFuture<'a, Response<Body>> {
835 async move {
836 for responder in self.responders.iter() {
837 response = responder.respond(request, response).await;
838 }
839 response
840 }
841 .boxed()
842 }
843}
844
845pub struct FailOneThenTemporarilyBlock {
851 path_to_fail: Arc<Mutex<Option<PathBuf>>>,
852 block_until: Shared<oneshot::Receiver<()>>,
853}
854
855impl FailOneThenTemporarilyBlock {
856 pub fn new() -> (Self, oneshot::Sender<()>) {
858 let (send, recv) = oneshot::channel();
859 (Self { path_to_fail: Arc::new(Mutex::new(None)), block_until: recv.shared() }, send)
860 }
861}
862
863impl HttpResponder for FailOneThenTemporarilyBlock {
864 fn respond(&self, request: &Request<Body>, _: Response<Body>) -> BoxFuture<'_, Response<Body>> {
865 let response =
866 Response::builder().status(StatusCode::NOT_FOUND).body(Body::empty()).unwrap();
867 match &mut *self.path_to_fail.lock() {
868 o @ None => {
869 *o = Some(request.path().to_owned());
870 ready(response).boxed()
871 }
872 Some(path_to_fail) if path_to_fail == request.path() => ready(response).boxed(),
873 _ => {
874 let block_until = self.block_until.clone();
875 async move {
876 block_until.await.unwrap();
877 response
878 }
879 }
880 .boxed(),
881 }
882 }
883}