hyper/client/
pool.rs

1use std::collections::{HashMap, HashSet, VecDeque};
2use std::error::Error as StdError;
3use std::fmt;
4use std::ops::{Deref, DerefMut};
5use std::sync::{Arc, Mutex, Weak};
6
7#[cfg(not(feature = "runtime"))]
8use std::time::{Duration, Instant};
9
10use futures_channel::oneshot;
11#[cfg(feature = "runtime")]
12use tokio::time::{Duration, Instant, Interval};
13use tracing::{debug, trace};
14
15use super::client::Ver;
16use crate::common::{exec::Exec, task, Future, Pin, Poll, Unpin};
17
18// FIXME: allow() required due to `impl Trait` leaking types to this lint
19#[allow(missing_debug_implementations)]
20pub(super) struct Pool<T> {
21    // If the pool is disabled, this is None.
22    inner: Option<Arc<Mutex<PoolInner<T>>>>,
23}
24
25// Before using a pooled connection, make sure the sender is not dead.
26//
27// This is a trait to allow the `client::pool::tests` to work for `i32`.
28//
29// See https://github.com/hyperium/hyper/issues/1429
30pub(super) trait Poolable: Unpin + Send + Sized + 'static {
31    fn is_open(&self) -> bool;
32    /// Reserve this connection.
33    ///
34    /// Allows for HTTP/2 to return a shared reservation.
35    fn reserve(self) -> Reservation<Self>;
36    fn can_share(&self) -> bool;
37}
38
39/// When checking out a pooled connection, it might be that the connection
40/// only supports a single reservation, or it might be usable for many.
41///
42/// Specifically, HTTP/1 requires a unique reservation, but HTTP/2 can be
43/// used for multiple requests.
44// FIXME: allow() required due to `impl Trait` leaking types to this lint
45#[allow(missing_debug_implementations)]
46pub(super) enum Reservation<T> {
47    /// This connection could be used multiple times, the first one will be
48    /// reinserted into the `idle` pool, and the second will be given to
49    /// the `Checkout`.
50    #[cfg(feature = "http2")]
51    Shared(T, T),
52    /// This connection requires unique access. It will be returned after
53    /// use is complete.
54    Unique(T),
55}
56
57/// Simple type alias in case the key type needs to be adjusted.
58pub(super) type Key = (http::uri::Scheme, http::uri::Authority); //Arc<String>;
59
60struct PoolInner<T> {
61    // A flag that a connection is being established, and the connection
62    // should be shared. This prevents making multiple HTTP/2 connections
63    // to the same host.
64    connecting: HashSet<Key>,
65    // These are internal Conns sitting in the event loop in the KeepAlive
66    // state, waiting to receive a new Request to send on the socket.
67    idle: HashMap<Key, Vec<Idle<T>>>,
68    max_idle_per_host: usize,
69    // These are outstanding Checkouts that are waiting for a socket to be
70    // able to send a Request one. This is used when "racing" for a new
71    // connection.
72    //
73    // The Client starts 2 tasks, 1 to connect a new socket, and 1 to wait
74    // for the Pool to receive an idle Conn. When a Conn becomes idle,
75    // this list is checked for any parked Checkouts, and tries to notify
76    // them that the Conn could be used instead of waiting for a brand new
77    // connection.
78    waiters: HashMap<Key, VecDeque<oneshot::Sender<T>>>,
79    // A oneshot channel is used to allow the interval to be notified when
80    // the Pool completely drops. That way, the interval can cancel immediately.
81    #[cfg(feature = "runtime")]
82    idle_interval_ref: Option<oneshot::Sender<crate::common::Never>>,
83    #[cfg(feature = "runtime")]
84    exec: Exec,
85    timeout: Option<Duration>,
86}
87
88// This is because `Weak::new()` *allocates* space for `T`, even if it
89// doesn't need it!
90struct WeakOpt<T>(Option<Weak<T>>);
91
92#[derive(Clone, Copy, Debug)]
93pub(super) struct Config {
94    pub(super) idle_timeout: Option<Duration>,
95    pub(super) max_idle_per_host: usize,
96}
97
98impl Config {
99    pub(super) fn is_enabled(&self) -> bool {
100        self.max_idle_per_host > 0
101    }
102}
103
104impl<T> Pool<T> {
105    pub(super) fn new(config: Config, __exec: &Exec) -> Pool<T> {
106        let inner = if config.is_enabled() {
107            Some(Arc::new(Mutex::new(PoolInner {
108                connecting: HashSet::new(),
109                idle: HashMap::new(),
110                #[cfg(feature = "runtime")]
111                idle_interval_ref: None,
112                max_idle_per_host: config.max_idle_per_host,
113                waiters: HashMap::new(),
114                #[cfg(feature = "runtime")]
115                exec: __exec.clone(),
116                timeout: config.idle_timeout,
117            })))
118        } else {
119            None
120        };
121
122        Pool { inner }
123    }
124
125    fn is_enabled(&self) -> bool {
126        self.inner.is_some()
127    }
128
129    #[cfg(test)]
130    pub(super) fn no_timer(&self) {
131        // Prevent an actual interval from being created for this pool...
132        #[cfg(feature = "runtime")]
133        {
134            let mut inner = self.inner.as_ref().unwrap().lock().unwrap();
135            assert!(inner.idle_interval_ref.is_none(), "timer already spawned");
136            let (tx, _) = oneshot::channel();
137            inner.idle_interval_ref = Some(tx);
138        }
139    }
140}
141
142impl<T: Poolable> Pool<T> {
143    /// Returns a `Checkout` which is a future that resolves if an idle
144    /// connection becomes available.
145    pub(super) fn checkout(&self, key: Key) -> Checkout<T> {
146        Checkout {
147            key,
148            pool: self.clone(),
149            waiter: None,
150        }
151    }
152
153    /// Ensure that there is only ever 1 connecting task for HTTP/2
154    /// connections. This does nothing for HTTP/1.
155    pub(super) fn connecting(&self, key: &Key, ver: Ver) -> Option<Connecting<T>> {
156        if ver == Ver::Http2 {
157            if let Some(ref enabled) = self.inner {
158                let mut inner = enabled.lock().unwrap();
159                return if inner.connecting.insert(key.clone()) {
160                    let connecting = Connecting {
161                        key: key.clone(),
162                        pool: WeakOpt::downgrade(enabled),
163                    };
164                    Some(connecting)
165                } else {
166                    trace!("HTTP/2 connecting already in progress for {:?}", key);
167                    None
168                };
169            }
170        }
171
172        // else
173        Some(Connecting {
174            key: key.clone(),
175            // in HTTP/1's case, there is never a lock, so we don't
176            // need to do anything in Drop.
177            pool: WeakOpt::none(),
178        })
179    }
180
181    #[cfg(test)]
182    fn locked(&self) -> std::sync::MutexGuard<'_, PoolInner<T>> {
183        self.inner.as_ref().expect("enabled").lock().expect("lock")
184    }
185
186    /* Used in client/tests.rs...
187    #[cfg(feature = "runtime")]
188    #[cfg(test)]
189    pub(super) fn h1_key(&self, s: &str) -> Key {
190        Arc::new(s.to_string())
191    }
192
193    #[cfg(feature = "runtime")]
194    #[cfg(test)]
195    pub(super) fn idle_count(&self, key: &Key) -> usize {
196        self
197            .locked()
198            .idle
199            .get(key)
200            .map(|list| list.len())
201            .unwrap_or(0)
202    }
203    */
204
205    pub(super) fn pooled(
206        &self,
207        #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut connecting: Connecting<T>,
208        value: T,
209    ) -> Pooled<T> {
210        let (value, pool_ref) = if let Some(ref enabled) = self.inner {
211            match value.reserve() {
212                #[cfg(feature = "http2")]
213                Reservation::Shared(to_insert, to_return) => {
214                    let mut inner = enabled.lock().unwrap();
215                    inner.put(connecting.key.clone(), to_insert, enabled);
216                    // Do this here instead of Drop for Connecting because we
217                    // already have a lock, no need to lock the mutex twice.
218                    inner.connected(&connecting.key);
219                    // prevent the Drop of Connecting from repeating inner.connected()
220                    connecting.pool = WeakOpt::none();
221
222                    // Shared reservations don't need a reference to the pool,
223                    // since the pool always keeps a copy.
224                    (to_return, WeakOpt::none())
225                }
226                Reservation::Unique(value) => {
227                    // Unique reservations must take a reference to the pool
228                    // since they hope to reinsert once the reservation is
229                    // completed
230                    (value, WeakOpt::downgrade(enabled))
231                }
232            }
233        } else {
234            // If pool is not enabled, skip all the things...
235
236            // The Connecting should have had no pool ref
237            debug_assert!(connecting.pool.upgrade().is_none());
238
239            (value, WeakOpt::none())
240        };
241        Pooled {
242            key: connecting.key.clone(),
243            is_reused: false,
244            pool: pool_ref,
245            value: Some(value),
246        }
247    }
248
249    fn reuse(&self, key: &Key, value: T) -> Pooled<T> {
250        debug!("reuse idle connection for {:?}", key);
251        // TODO: unhack this
252        // In Pool::pooled(), which is used for inserting brand new connections,
253        // there's some code that adjusts the pool reference taken depending
254        // on if the Reservation can be shared or is unique. By the time
255        // reuse() is called, the reservation has already been made, and
256        // we just have the final value, without knowledge of if this is
257        // unique or shared. So, the hack is to just assume Ver::Http2 means
258        // shared... :(
259        let mut pool_ref = WeakOpt::none();
260        if !value.can_share() {
261            if let Some(ref enabled) = self.inner {
262                pool_ref = WeakOpt::downgrade(enabled);
263            }
264        }
265
266        Pooled {
267            is_reused: true,
268            key: key.clone(),
269            pool: pool_ref,
270            value: Some(value),
271        }
272    }
273}
274
275/// Pop off this list, looking for a usable connection that hasn't expired.
276struct IdlePopper<'a, T> {
277    key: &'a Key,
278    list: &'a mut Vec<Idle<T>>,
279}
280
281impl<'a, T: Poolable + 'a> IdlePopper<'a, T> {
282    fn pop(self, expiration: &Expiration) -> Option<Idle<T>> {
283        while let Some(entry) = self.list.pop() {
284            // If the connection has been closed, or is older than our idle
285            // timeout, simply drop it and keep looking...
286            if !entry.value.is_open() {
287                trace!("removing closed connection for {:?}", self.key);
288                continue;
289            }
290            // TODO: Actually, since the `idle` list is pushed to the end always,
291            // that would imply that if *this* entry is expired, then anything
292            // "earlier" in the list would *have* to be expired also... Right?
293            //
294            // In that case, we could just break out of the loop and drop the
295            // whole list...
296            if expiration.expires(entry.idle_at) {
297                trace!("removing expired connection for {:?}", self.key);
298                continue;
299            }
300
301            let value = match entry.value.reserve() {
302                #[cfg(feature = "http2")]
303                Reservation::Shared(to_reinsert, to_checkout) => {
304                    self.list.push(Idle {
305                        idle_at: Instant::now(),
306                        value: to_reinsert,
307                    });
308                    to_checkout
309                }
310                Reservation::Unique(unique) => unique,
311            };
312
313            return Some(Idle {
314                idle_at: entry.idle_at,
315                value,
316            });
317        }
318
319        None
320    }
321}
322
323impl<T: Poolable> PoolInner<T> {
324    fn put(&mut self, key: Key, value: T, __pool_ref: &Arc<Mutex<PoolInner<T>>>) {
325        if value.can_share() && self.idle.contains_key(&key) {
326            trace!("put; existing idle HTTP/2 connection for {:?}", key);
327            return;
328        }
329        trace!("put; add idle connection for {:?}", key);
330        let mut remove_waiters = false;
331        let mut value = Some(value);
332        if let Some(waiters) = self.waiters.get_mut(&key) {
333            while let Some(tx) = waiters.pop_front() {
334                if !tx.is_canceled() {
335                    let reserved = value.take().expect("value already sent");
336                    let reserved = match reserved.reserve() {
337                        #[cfg(feature = "http2")]
338                        Reservation::Shared(to_keep, to_send) => {
339                            value = Some(to_keep);
340                            to_send
341                        }
342                        Reservation::Unique(uniq) => uniq,
343                    };
344                    match tx.send(reserved) {
345                        Ok(()) => {
346                            if value.is_none() {
347                                break;
348                            } else {
349                                continue;
350                            }
351                        }
352                        Err(e) => {
353                            value = Some(e);
354                        }
355                    }
356                }
357
358                trace!("put; removing canceled waiter for {:?}", key);
359            }
360            remove_waiters = waiters.is_empty();
361        }
362        if remove_waiters {
363            self.waiters.remove(&key);
364        }
365
366        match value {
367            Some(value) => {
368                // borrow-check scope...
369                {
370                    let idle_list = self.idle.entry(key.clone()).or_insert_with(Vec::new);
371                    if self.max_idle_per_host <= idle_list.len() {
372                        trace!("max idle per host for {:?}, dropping connection", key);
373                        return;
374                    }
375
376                    debug!("pooling idle connection for {:?}", key);
377                    idle_list.push(Idle {
378                        value,
379                        idle_at: Instant::now(),
380                    });
381                }
382
383                #[cfg(feature = "runtime")]
384                {
385                    self.spawn_idle_interval(__pool_ref);
386                }
387            }
388            None => trace!("put; found waiter for {:?}", key),
389        }
390    }
391
392    /// A `Connecting` task is complete. Not necessarily successfully,
393    /// but the lock is going away, so clean up.
394    fn connected(&mut self, key: &Key) {
395        let existed = self.connecting.remove(key);
396        debug_assert!(existed, "Connecting dropped, key not in pool.connecting");
397        // cancel any waiters. if there are any, it's because
398        // this Connecting task didn't complete successfully.
399        // those waiters would never receive a connection.
400        self.waiters.remove(key);
401    }
402
403    #[cfg(feature = "runtime")]
404    fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T>>>) {
405        let (dur, rx) = {
406            if self.idle_interval_ref.is_some() {
407                return;
408            }
409
410            if let Some(dur) = self.timeout {
411                let (tx, rx) = oneshot::channel();
412                self.idle_interval_ref = Some(tx);
413                (dur, rx)
414            } else {
415                return;
416            }
417        };
418
419        let interval = IdleTask {
420            interval: tokio::time::interval(dur),
421            pool: WeakOpt::downgrade(pool_ref),
422            pool_drop_notifier: rx,
423        };
424
425        self.exec.execute(interval);
426    }
427}
428
429impl<T> PoolInner<T> {
430    /// Any `FutureResponse`s that were created will have made a `Checkout`,
431    /// and possibly inserted into the pool that it is waiting for an idle
432    /// connection. If a user ever dropped that future, we need to clean out
433    /// those parked senders.
434    fn clean_waiters(&mut self, key: &Key) {
435        let mut remove_waiters = false;
436        if let Some(waiters) = self.waiters.get_mut(key) {
437            waiters.retain(|tx| !tx.is_canceled());
438            remove_waiters = waiters.is_empty();
439        }
440        if remove_waiters {
441            self.waiters.remove(key);
442        }
443    }
444}
445
446#[cfg(feature = "runtime")]
447impl<T: Poolable> PoolInner<T> {
448    /// This should *only* be called by the IdleTask
449    fn clear_expired(&mut self) {
450        let dur = self.timeout.expect("interval assumes timeout");
451
452        let now = Instant::now();
453        //self.last_idle_check_at = now;
454
455        self.idle.retain(|key, values| {
456            values.retain(|entry| {
457                if !entry.value.is_open() {
458                    trace!("idle interval evicting closed for {:?}", key);
459                    return false;
460                }
461
462                // Avoid `Instant::sub` to avoid issues like rust-lang/rust#86470.
463                if now.saturating_duration_since(entry.idle_at) > dur {
464                    trace!("idle interval evicting expired for {:?}", key);
465                    return false;
466                }
467
468                // Otherwise, keep this value...
469                true
470            });
471
472            // returning false evicts this key/val
473            !values.is_empty()
474        });
475    }
476}
477
478impl<T> Clone for Pool<T> {
479    fn clone(&self) -> Pool<T> {
480        Pool {
481            inner: self.inner.clone(),
482        }
483    }
484}
485
486/// A wrapped poolable value that tries to reinsert to the Pool on Drop.
487// Note: The bounds `T: Poolable` is needed for the Drop impl.
488pub(super) struct Pooled<T: Poolable> {
489    value: Option<T>,
490    is_reused: bool,
491    key: Key,
492    pool: WeakOpt<Mutex<PoolInner<T>>>,
493}
494
495impl<T: Poolable> Pooled<T> {
496    pub(super) fn is_reused(&self) -> bool {
497        self.is_reused
498    }
499
500    pub(super) fn is_pool_enabled(&self) -> bool {
501        self.pool.0.is_some()
502    }
503
504    fn as_ref(&self) -> &T {
505        self.value.as_ref().expect("not dropped")
506    }
507
508    fn as_mut(&mut self) -> &mut T {
509        self.value.as_mut().expect("not dropped")
510    }
511}
512
513impl<T: Poolable> Deref for Pooled<T> {
514    type Target = T;
515    fn deref(&self) -> &T {
516        self.as_ref()
517    }
518}
519
520impl<T: Poolable> DerefMut for Pooled<T> {
521    fn deref_mut(&mut self) -> &mut T {
522        self.as_mut()
523    }
524}
525
526impl<T: Poolable> Drop for Pooled<T> {
527    fn drop(&mut self) {
528        if let Some(value) = self.value.take() {
529            if !value.is_open() {
530                // If we *already* know the connection is done here,
531                // it shouldn't be re-inserted back into the pool.
532                return;
533            }
534
535            if let Some(pool) = self.pool.upgrade() {
536                if let Ok(mut inner) = pool.lock() {
537                    inner.put(self.key.clone(), value, &pool);
538                }
539            } else if !value.can_share() {
540                trace!("pool dropped, dropping pooled ({:?})", self.key);
541            }
542            // Ver::Http2 is already in the Pool (or dead), so we wouldn't
543            // have an actual reference to the Pool.
544        }
545    }
546}
547
548impl<T: Poolable> fmt::Debug for Pooled<T> {
549    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
550        f.debug_struct("Pooled").field("key", &self.key).finish()
551    }
552}
553
554struct Idle<T> {
555    idle_at: Instant,
556    value: T,
557}
558
559// FIXME: allow() required due to `impl Trait` leaking types to this lint
560#[allow(missing_debug_implementations)]
561pub(super) struct Checkout<T> {
562    key: Key,
563    pool: Pool<T>,
564    waiter: Option<oneshot::Receiver<T>>,
565}
566
567#[derive(Debug)]
568pub(super) struct CheckoutIsClosedError;
569
570impl StdError for CheckoutIsClosedError {}
571
572impl fmt::Display for CheckoutIsClosedError {
573    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
574        f.write_str("checked out connection was closed")
575    }
576}
577
578impl<T: Poolable> Checkout<T> {
579    fn poll_waiter(
580        &mut self,
581        cx: &mut task::Context<'_>,
582    ) -> Poll<Option<crate::Result<Pooled<T>>>> {
583        if let Some(mut rx) = self.waiter.take() {
584            match Pin::new(&mut rx).poll(cx) {
585                Poll::Ready(Ok(value)) => {
586                    if value.is_open() {
587                        Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
588                    } else {
589                        Poll::Ready(Some(Err(
590                            crate::Error::new_canceled().with(CheckoutIsClosedError)
591                        )))
592                    }
593                }
594                Poll::Pending => {
595                    self.waiter = Some(rx);
596                    Poll::Pending
597                }
598                Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
599                    crate::Error::new_canceled().with("request has been canceled")
600                ))),
601            }
602        } else {
603            Poll::Ready(None)
604        }
605    }
606
607    fn checkout(&mut self, cx: &mut task::Context<'_>) -> Option<Pooled<T>> {
608        let entry = {
609            let mut inner = self.pool.inner.as_ref()?.lock().unwrap();
610            let expiration = Expiration::new(inner.timeout);
611            let maybe_entry = inner.idle.get_mut(&self.key).and_then(|list| {
612                trace!("take? {:?}: expiration = {:?}", self.key, expiration.0);
613                // A block to end the mutable borrow on list,
614                // so the map below can check is_empty()
615                {
616                    let popper = IdlePopper {
617                        key: &self.key,
618                        list,
619                    };
620                    popper.pop(&expiration)
621                }
622                .map(|e| (e, list.is_empty()))
623            });
624
625            let (entry, empty) = if let Some((e, empty)) = maybe_entry {
626                (Some(e), empty)
627            } else {
628                // No entry found means nuke the list for sure.
629                (None, true)
630            };
631            if empty {
632                //TODO: This could be done with the HashMap::entry API instead.
633                inner.idle.remove(&self.key);
634            }
635
636            if entry.is_none() && self.waiter.is_none() {
637                let (tx, mut rx) = oneshot::channel();
638                trace!("checkout waiting for idle connection: {:?}", self.key);
639                inner
640                    .waiters
641                    .entry(self.key.clone())
642                    .or_insert_with(VecDeque::new)
643                    .push_back(tx);
644
645                // register the waker with this oneshot
646                assert!(Pin::new(&mut rx).poll(cx).is_pending());
647                self.waiter = Some(rx);
648            }
649
650            entry
651        };
652
653        entry.map(|e| self.pool.reuse(&self.key, e.value))
654    }
655}
656
657impl<T: Poolable> Future for Checkout<T> {
658    type Output = crate::Result<Pooled<T>>;
659
660    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
661        if let Some(pooled) = ready!(self.poll_waiter(cx)?) {
662            return Poll::Ready(Ok(pooled));
663        }
664
665        if let Some(pooled) = self.checkout(cx) {
666            Poll::Ready(Ok(pooled))
667        } else if !self.pool.is_enabled() {
668            Poll::Ready(Err(crate::Error::new_canceled().with("pool is disabled")))
669        } else {
670            // There's a new waiter, already registered in self.checkout()
671            debug_assert!(self.waiter.is_some());
672            Poll::Pending
673        }
674    }
675}
676
677impl<T> Drop for Checkout<T> {
678    fn drop(&mut self) {
679        if self.waiter.take().is_some() {
680            trace!("checkout dropped for {:?}", self.key);
681            if let Some(Ok(mut inner)) = self.pool.inner.as_ref().map(|i| i.lock()) {
682                inner.clean_waiters(&self.key);
683            }
684        }
685    }
686}
687
688// FIXME: allow() required due to `impl Trait` leaking types to this lint
689#[allow(missing_debug_implementations)]
690pub(super) struct Connecting<T: Poolable> {
691    key: Key,
692    pool: WeakOpt<Mutex<PoolInner<T>>>,
693}
694
695impl<T: Poolable> Connecting<T> {
696    pub(super) fn alpn_h2(self, pool: &Pool<T>) -> Option<Self> {
697        debug_assert!(
698            self.pool.0.is_none(),
699            "Connecting::alpn_h2 but already Http2"
700        );
701
702        pool.connecting(&self.key, Ver::Http2)
703    }
704}
705
706impl<T: Poolable> Drop for Connecting<T> {
707    fn drop(&mut self) {
708        if let Some(pool) = self.pool.upgrade() {
709            // No need to panic on drop, that could abort!
710            if let Ok(mut inner) = pool.lock() {
711                inner.connected(&self.key);
712            }
713        }
714    }
715}
716
717struct Expiration(Option<Duration>);
718
719impl Expiration {
720    fn new(dur: Option<Duration>) -> Expiration {
721        Expiration(dur)
722    }
723
724    fn expires(&self, instant: Instant) -> bool {
725        match self.0 {
726            // Avoid `Instant::elapsed` to avoid issues like rust-lang/rust#86470.
727            Some(timeout) => Instant::now().saturating_duration_since(instant) > timeout,
728            None => false,
729        }
730    }
731}
732
733#[cfg(feature = "runtime")]
734pin_project_lite::pin_project! {
735    struct IdleTask<T> {
736        #[pin]
737        interval: Interval,
738        pool: WeakOpt<Mutex<PoolInner<T>>>,
739        // This allows the IdleTask to be notified as soon as the entire
740        // Pool is fully dropped, and shutdown. This channel is never sent on,
741        // but Err(Canceled) will be received when the Pool is dropped.
742        #[pin]
743        pool_drop_notifier: oneshot::Receiver<crate::common::Never>,
744    }
745}
746
747#[cfg(feature = "runtime")]
748impl<T: Poolable + 'static> Future for IdleTask<T> {
749    type Output = ();
750
751    fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
752        let mut this = self.project();
753        loop {
754            match this.pool_drop_notifier.as_mut().poll(cx) {
755                Poll::Ready(Ok(n)) => match n {},
756                Poll::Pending => (),
757                Poll::Ready(Err(_canceled)) => {
758                    trace!("pool closed, canceling idle interval");
759                    return Poll::Ready(());
760                }
761            }
762
763            ready!(this.interval.as_mut().poll_tick(cx));
764
765            if let Some(inner) = this.pool.upgrade() {
766                if let Ok(mut inner) = inner.lock() {
767                    trace!("idle interval checking for expired");
768                    inner.clear_expired();
769                    continue;
770                }
771            }
772            return Poll::Ready(());
773        }
774    }
775}
776
777impl<T> WeakOpt<T> {
778    fn none() -> Self {
779        WeakOpt(None)
780    }
781
782    fn downgrade(arc: &Arc<T>) -> Self {
783        WeakOpt(Some(Arc::downgrade(arc)))
784    }
785
786    fn upgrade(&self) -> Option<Arc<T>> {
787        self.0.as_ref().and_then(Weak::upgrade)
788    }
789}
790
791#[cfg(test)]
792mod tests {
793    use std::task::Poll;
794    use std::time::Duration;
795
796    use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
797    use crate::common::{exec::Exec, task, Future, Pin};
798
799    /// Test unique reservations.
800    #[derive(Debug, PartialEq, Eq)]
801    struct Uniq<T>(T);
802
803    impl<T: Send + 'static + Unpin> Poolable for Uniq<T> {
804        fn is_open(&self) -> bool {
805            true
806        }
807
808        fn reserve(self) -> Reservation<Self> {
809            Reservation::Unique(self)
810        }
811
812        fn can_share(&self) -> bool {
813            false
814        }
815    }
816
817    fn c<T: Poolable>(key: Key) -> Connecting<T> {
818        Connecting {
819            key,
820            pool: WeakOpt::none(),
821        }
822    }
823
824    fn host_key(s: &str) -> Key {
825        (http::uri::Scheme::HTTP, s.parse().expect("host key"))
826    }
827
828    fn pool_no_timer<T>() -> Pool<T> {
829        pool_max_idle_no_timer(::std::usize::MAX)
830    }
831
832    fn pool_max_idle_no_timer<T>(max_idle: usize) -> Pool<T> {
833        let pool = Pool::new(
834            super::Config {
835                idle_timeout: Some(Duration::from_millis(100)),
836                max_idle_per_host: max_idle,
837            },
838            &Exec::Default,
839        );
840        pool.no_timer();
841        pool
842    }
843
844    #[tokio::test]
845    async fn test_pool_checkout_smoke() {
846        let pool = pool_no_timer();
847        let key = host_key("foo");
848        let pooled = pool.pooled(c(key.clone()), Uniq(41));
849
850        drop(pooled);
851
852        match pool.checkout(key).await {
853            Ok(pooled) => assert_eq!(*pooled, Uniq(41)),
854            Err(_) => panic!("not ready"),
855        };
856    }
857
858    /// Helper to check if the future is ready after polling once.
859    struct PollOnce<'a, F>(&'a mut F);
860
861    impl<F, T, U> Future for PollOnce<'_, F>
862    where
863        F: Future<Output = Result<T, U>> + Unpin,
864    {
865        type Output = Option<()>;
866
867        fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
868            match Pin::new(&mut self.0).poll(cx) {
869                Poll::Ready(Ok(_)) => Poll::Ready(Some(())),
870                Poll::Ready(Err(_)) => Poll::Ready(Some(())),
871                Poll::Pending => Poll::Ready(None),
872            }
873        }
874    }
875
876    #[tokio::test]
877    async fn test_pool_checkout_returns_none_if_expired() {
878        let pool = pool_no_timer();
879        let key = host_key("foo");
880        let pooled = pool.pooled(c(key.clone()), Uniq(41));
881
882        drop(pooled);
883        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
884        let mut checkout = pool.checkout(key);
885        let poll_once = PollOnce(&mut checkout);
886        let is_not_ready = poll_once.await.is_none();
887        assert!(is_not_ready);
888    }
889
890    #[cfg(feature = "runtime")]
891    #[tokio::test]
892    async fn test_pool_checkout_removes_expired() {
893        let pool = pool_no_timer();
894        let key = host_key("foo");
895
896        pool.pooled(c(key.clone()), Uniq(41));
897        pool.pooled(c(key.clone()), Uniq(5));
898        pool.pooled(c(key.clone()), Uniq(99));
899
900        assert_eq!(
901            pool.locked().idle.get(&key).map(|entries| entries.len()),
902            Some(3)
903        );
904        tokio::time::sleep(pool.locked().timeout.unwrap()).await;
905
906        let mut checkout = pool.checkout(key.clone());
907        let poll_once = PollOnce(&mut checkout);
908        // checkout.await should clean out the expired
909        poll_once.await;
910        assert!(pool.locked().idle.get(&key).is_none());
911    }
912
913    #[test]
914    fn test_pool_max_idle_per_host() {
915        let pool = pool_max_idle_no_timer(2);
916        let key = host_key("foo");
917
918        pool.pooled(c(key.clone()), Uniq(41));
919        pool.pooled(c(key.clone()), Uniq(5));
920        pool.pooled(c(key.clone()), Uniq(99));
921
922        // pooled and dropped 3, max_idle should only allow 2
923        assert_eq!(
924            pool.locked().idle.get(&key).map(|entries| entries.len()),
925            Some(2)
926        );
927    }
928
929    #[cfg(feature = "runtime")]
930    #[tokio::test]
931    async fn test_pool_timer_removes_expired() {
932        let _ = pretty_env_logger::try_init();
933        tokio::time::pause();
934
935        let pool = Pool::new(
936            super::Config {
937                idle_timeout: Some(Duration::from_millis(10)),
938                max_idle_per_host: std::usize::MAX,
939            },
940            &Exec::Default,
941        );
942
943        let key = host_key("foo");
944
945        pool.pooled(c(key.clone()), Uniq(41));
946        pool.pooled(c(key.clone()), Uniq(5));
947        pool.pooled(c(key.clone()), Uniq(99));
948
949        assert_eq!(
950            pool.locked().idle.get(&key).map(|entries| entries.len()),
951            Some(3)
952        );
953
954        // Let the timer tick passed the expiration...
955        tokio::time::advance(Duration::from_millis(30)).await;
956        // Yield so the Interval can reap...
957        tokio::task::yield_now().await;
958
959        assert!(pool.locked().idle.get(&key).is_none());
960    }
961
962    #[tokio::test]
963    async fn test_pool_checkout_task_unparked() {
964        use futures_util::future::join;
965        use futures_util::FutureExt;
966
967        let pool = pool_no_timer();
968        let key = host_key("foo");
969        let pooled = pool.pooled(c(key.clone()), Uniq(41));
970
971        let checkout = join(pool.checkout(key), async {
972            // the checkout future will park first,
973            // and then this lazy future will be polled, which will insert
974            // the pooled back into the pool
975            //
976            // this test makes sure that doing so will unpark the checkout
977            drop(pooled);
978        })
979        .map(|(entry, _)| entry);
980
981        assert_eq!(*checkout.await.unwrap(), Uniq(41));
982    }
983
984    #[tokio::test]
985    async fn test_pool_checkout_drop_cleans_up_waiters() {
986        let pool = pool_no_timer::<Uniq<i32>>();
987        let key = host_key("foo");
988
989        let mut checkout1 = pool.checkout(key.clone());
990        let mut checkout2 = pool.checkout(key.clone());
991
992        let poll_once1 = PollOnce(&mut checkout1);
993        let poll_once2 = PollOnce(&mut checkout2);
994
995        // first poll needed to get into Pool's parked
996        poll_once1.await;
997        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
998        poll_once2.await;
999        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 2);
1000
1001        // on drop, clean up Pool
1002        drop(checkout1);
1003        assert_eq!(pool.locked().waiters.get(&key).unwrap().len(), 1);
1004
1005        drop(checkout2);
1006        assert!(pool.locked().waiters.get(&key).is_none());
1007    }
1008
1009    #[derive(Debug)]
1010    struct CanClose {
1011        #[allow(unused)]
1012        val: i32,
1013        closed: bool,
1014    }
1015
1016    impl Poolable for CanClose {
1017        fn is_open(&self) -> bool {
1018            !self.closed
1019        }
1020
1021        fn reserve(self) -> Reservation<Self> {
1022            Reservation::Unique(self)
1023        }
1024
1025        fn can_share(&self) -> bool {
1026            false
1027        }
1028    }
1029
1030    #[test]
1031    fn pooled_drop_if_closed_doesnt_reinsert() {
1032        let pool = pool_no_timer();
1033        let key = host_key("foo");
1034        pool.pooled(
1035            c(key.clone()),
1036            CanClose {
1037                val: 57,
1038                closed: true,
1039            },
1040        );
1041
1042        assert!(!pool.locked().idle.contains_key(&key));
1043    }
1044}