netstack3_base/data_structures/
token_bucket.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::time::Duration;
6
7use crate::InstantContext;
8
9// TODO(https://github.com/rust-lang/rust/issues/57391): Replace this with Duration::SECOND.
10const SECOND: Duration = Duration::from_secs(1);
11
12/// Instead of actually storing the number of tokens, we store the number of
13/// fractions of `1 / TOKEN_MULTIPLIER`. If we stored the number of tokens, then
14/// under heavy load scenarios, the actual observed rate could be far off from
15/// the ideal rate due to integer rounding issues. Storing fractions instead
16/// limits the inaccuracy to at most `1 / TOKEN_MULTIPLIER` away from the ideal
17/// rate. See the comment in `try_take` for more details.
18///
19/// Note that the choice of 256 for `TOKEN_MULTIPLIER` provides us with good
20/// accuracy (only deviating from the ideal rate by 1/256) while still allowing
21/// for a maximum rate of 2^56 tokens per second.
22const TOKEN_MULTIPLIER: u64 = 256;
23
24/// A [token bucket] used for rate limiting.
25///
26/// `TokenBucket` implements rate limiting by "filling" a bucket with "tokens"
27/// at a constant rate, and allowing tokens to be consumed from the bucket until
28/// it is empty. This guarantees that a consumer may only maintain a rate of
29/// consumption faster than the rate of refilling for a bounded amount of time
30/// before they will catch up and find the bucket empty.
31///
32/// Note that the bucket has a maximum size beyond which no new tokens will be
33/// added. This prevents a long quiet period from building up a large backlog of
34/// tokens which can then be used in an intense and sustained burst.
35///
36/// This implementation does not require any background threads or timers to
37/// operate; it refills the bucket during calls to `try_take`, so no extra
38/// infrastructure is required to use it.
39///
40/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
41pub struct TokenBucket<I> {
42    // The last time that the bucket was refilled, or `None` if the bucket has
43    // never been refilled.
44    last_refilled: Option<I>,
45    token_fractions: u64,
46    token_fractions_per_second: u64,
47}
48
49impl<I> TokenBucket<I> {
50    /// Constructs a new `TokenBucket` and initializes it with one second's
51    /// worth of tokens.
52    ///
53    /// # Panics
54    ///
55    /// `new` panics if `tokens_per_second` is greater than 2^56 - 1.
56    pub fn new(tokens_per_second: u64) -> TokenBucket<I> {
57        let token_fractions_per_second = tokens_per_second.checked_mul(TOKEN_MULTIPLIER).unwrap();
58        TokenBucket {
59            last_refilled: None,
60            // Initialize to 0 so that the first call to `try_take` will
61            // initialize the `last_refilled` time and fill the bucket. If we
62            // initialized this to a full bucket, then an immediate burst of
63            // calls to `try_take` would appear as though they'd happened over
64            // the course of a second, and the client would effectively get
65            // double the ideal rate until the second round of tokens expired.
66            token_fractions: 0,
67            token_fractions_per_second,
68        }
69    }
70}
71
72impl<I: crate::Instant> TokenBucket<I> {
73    /// Attempt to take a token from the bucket.
74    ///
75    /// `try_take` attempts to take a token from the bucket. If the bucket is
76    /// currently empty, then no token is available to be taken, and `try_take`
77    /// return false.
78    pub fn try_take<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC) -> bool {
79        if self.token_fractions >= TOKEN_MULTIPLIER {
80            self.token_fractions -= TOKEN_MULTIPLIER;
81            return true;
82        }
83
84        // The algorithm implemented here is as follows: Whenever the bucket
85        // empties, refill it immediately. In order not to violate the
86        // requirement that tokens are added at a particular rate, we only add
87        // the number of tokens that "should have been" added since the last
88        // refill. We never add more than one second's worth of tokens at a time
89        // in order to guarantee that the bucket never has more than one
90        // second's worth of tokens in it.
91        //
92        // If tokens are being consumed at a rate slower than they are being
93        // added, then we will exhaust the bucket less often than once per
94        // second, and every refill will be a complete refill. If tokens are
95        // being consumed at a rate faster than they are being added, then the
96        // duration between refills will continuously decrease until every call
97        // to `try_take` adds 0 or t in [1, 2) tokens.
98        //
99        // Consider, for example, a production rate of 32 tokens per second and
100        // a consumption rate of 64 tokens per second:
101        // - First, there are 32 tokens in the bucket.
102        // - After 0.5 seconds, all 32 have been exhausted.
103        // - The call to `try_take` which exhausts the bucket refills the bucket
104        //   with 0.5 seconds' worth of tokens, or 16 tokens.
105        //
106        // This process repeats itself, halving the number of tokens added (and
107        // halving the amount of time to exhaust the bucket) until, after an
108        // amount of time which is linear in the rate of tokens being added, a
109        // call to `try_take` adds only 0 or t in [1, 2) tokens. In either case,
110        // the bucket is left with less than 1 token (if `try_take` adds >= 1
111        // token, it also consumes 1 token immediately).
112        //
113        // This has the potential downside of, under heavy load, executing a
114        // slightly more complex algorithm on every call to `try_take`, which
115        // includes querying for the current time. I (joshlf) speculate that
116        // this isn't an issue in practice, but it's worth calling out in case
117        // it becomes an issue in the future.
118
119        let now = bindings_ctx.now();
120        // The duration since the last refill, or 1 second, whichever is
121        // shorter. If this is the first fill, pretend that a full second has
122        // elapsed since the previous refill. In reality, there was no previous
123        // refill, which means it's fine to fill the bucket completely.
124        let dur_since_last_refilled = self.last_refilled.map_or(SECOND, |last_refilled| {
125            let dur = now.saturating_duration_since(last_refilled);
126            if dur > SECOND {
127                SECOND
128            } else {
129                dur
130            }
131        });
132
133        // Do math in u128 to avoid overflow. Be careful to multiply first and
134        // then divide to minimize integer division rounding error. The result
135        // of the calculation should always fit in a `u64` because the ratio
136        // `dur_since_last_refilled / SECOND` is guaranteed not to be greater
137        // than 1.
138        let added_token_fractions = u64::try_from(
139            (u128::from(self.token_fractions_per_second) * dur_since_last_refilled.as_nanos())
140                / SECOND.as_nanos(),
141        )
142        .unwrap();
143
144        // Only refill the bucket if we can add at least 1 token. This avoids
145        // two failure modes:
146        // - If we always blindly added however many token fractions are
147        //   available, then under heavy load, we might constantly add 0 token
148        //   fractions (because less time has elapsed since `last_refilled` than
149        //   is required to add a single token fraction) while still updating
150        //   `last_refilled` each time. This would drop the observed rate to 0
151        //   in the worst case.
152        // - If we always added >= 1 token fraction (as opposed to >= 1 full
153        //   token), then we would run into integer math inaccuracy issues. In
154        //   the worst case, `try_take` would be called after just less than the
155        //   amount of time required to add two token fractions. The actual
156        //   number of token fractions added would be rounded down to 1, and the
157        //   observed rate would be slightly more than 1/2 of the ideal rate.
158        //
159        // By always adding at least 1 token, we ensure that the worst case
160        // behavior is when `try_take` is called after just less than the amount
161        // of time required to add `TOKEN_MULTIPLIER + 1` token fractions has
162        // elapsed. In this case, the actual number of token fractions added is
163        // rounded down to 1, and the observed rate is within `1 /
164        // TOKEN_MULTIPLIER` of the ideal rate.
165        if let Some(new_token_fractions) =
166            (self.token_fractions + added_token_fractions).checked_sub(TOKEN_MULTIPLIER)
167        {
168            self.token_fractions = new_token_fractions;
169            self.last_refilled = Some(now);
170            true
171        } else {
172            return false;
173        }
174    }
175}
176
177#[cfg(test)]
178pub(crate) mod tests {
179    use super::*;
180
181    use crate::testutil::{FakeInstant, FakeInstantCtx};
182
183    impl<I: crate::Instant> TokenBucket<I> {
184        /// Call `try_take` `n` times, and assert that it succeeds every time.
185        fn assert_take_n<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC, n: usize) {
186            for _ in 0..n {
187                assert!(self.try_take(bindings_ctx));
188            }
189        }
190    }
191
192    #[test]
193    fn test_token_bucket() {
194        /// Construct a `FakeInstantCtx` and a `TokenBucket` with a rate of 64
195        /// tokens per second, and pass them to `f`.
196        fn test<F: FnOnce(FakeInstantCtx, TokenBucket<FakeInstant>)>(f: F) {
197            f(FakeInstantCtx::default(), TokenBucket::new(64));
198        }
199
200        // Test that, if we consume all of the tokens in the bucket, but do not
201        // attempt to consume any more than that, the bucket will not be
202        // updated.
203        test(|mut ctx, mut bucket| {
204            let epoch = ctx.now();
205            assert!(bucket.try_take(&ctx));
206            assert_eq!(bucket.last_refilled.unwrap(), epoch);
207            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
208
209            // Sleep so that the current time will be different than the time at
210            // which the `last_refilled` time was initialized. That way, we can
211            // tell whether the `last_refilled` field was updated or not.
212            ctx.sleep(SECOND);
213            bucket.assert_take_n(&ctx, 63);
214            assert_eq!(bucket.last_refilled.unwrap(), epoch);
215            assert_eq!(bucket.token_fractions, 0);
216        });
217
218        // Test that, if we try to consume a token when the bucket is empty, it
219        // will get refilled.
220        test(|mut ctx, mut bucket| {
221            let epoch = ctx.now();
222            assert!(bucket.try_take(&ctx));
223            assert_eq!(bucket.last_refilled.unwrap(), epoch);
224            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
225
226            // Sleep for one second so that the bucket will be completely
227            // refilled.
228            ctx.sleep(SECOND);
229            bucket.assert_take_n(&ctx, 64);
230            assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND));
231            // 1 token was consumed by the last call to `try_take`.
232            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
233        });
234
235        // Test that, if more than 1 second has elapsed since the previous
236        // refill, we still only fill with 1 second's worth of tokens.
237        test(|mut ctx, mut bucket| {
238            let epoch = ctx.now();
239            assert!(bucket.try_take(&ctx));
240            assert_eq!(bucket.last_refilled.unwrap(), epoch);
241            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
242
243            ctx.sleep(SECOND * 2);
244            bucket.assert_take_n(&ctx, 64);
245            assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND * 2));
246            // 1 token was consumed by the last call to `try_take`.
247            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
248        });
249
250        // Test that, if we refill the bucket when less then a second has
251        // elapsed, a proportional amount of the bucket is refilled.
252        test(|mut ctx, mut bucket| {
253            let epoch = ctx.now();
254            assert!(bucket.try_take(&ctx));
255            assert_eq!(bucket.last_refilled.unwrap(), epoch);
256            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
257
258            ctx.sleep(SECOND / 2);
259            bucket.assert_take_n(&ctx, 64);
260            assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND / 2));
261            // Since only half a second had elapsed since the previous refill,
262            // only half of the tokens were refilled. 1 was consumed by the last
263            // call to `try_take`.
264            assert_eq!(bucket.token_fractions, 31 * TOKEN_MULTIPLIER);
265        });
266
267        // Test that, if we try to consume a token when the bucket is empty and
268        // not enough time has elapsed to allow for any tokens to be added,
269        // `try_take` will fail and the bucket will remain empty.
270        test(|mut ctx, mut bucket| {
271            // Allow 1/65 of a second to elapse so we know we're not just
272            // dealing with a consequence of no time having elapsed. The
273            // "correct" number of tokens to add after 1/65 of a second is
274            // 64/65, which will be rounded down to 0.
275            let epoch = ctx.now();
276            bucket.assert_take_n(&ctx, 64);
277            ctx.sleep(SECOND / 128);
278            assert!(!bucket.try_take(&ctx));
279            assert_eq!(bucket.last_refilled.unwrap(), epoch);
280            assert_eq!(bucket.token_fractions, 0);
281        });
282
283        // Test that, as long as we consume tokens at exactly the right rate, we
284        // never fail to consume a token.
285        test(|mut ctx, mut bucket| {
286            // Initialize the `last_refilled` time and then drain the bucket,
287            // leaving the `last_refilled` time at t=0 and the bucket empty.
288            bucket.assert_take_n(&ctx, 64);
289            for _ in 0..1_000 {
290                // `Duration`s store nanoseconds under the hood, and 64 divides
291                // 1e9 evenly, so this is lossless.
292                ctx.sleep(SECOND / 64);
293                assert!(bucket.try_take(&ctx));
294                assert_eq!(bucket.token_fractions, 0);
295                assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
296            }
297        });
298
299        // Test that, if we consume tokens too quickly, we succeed in consuming
300        // tokens the correct proportion of the time.
301        //
302        // Test with rates close to 1 (2/1 through 5/4) and rates much larger
303        // than 1 (3/1 through 6/1).
304        for (numer, denom) in
305            [(2, 1), (3, 2), (4, 3), (5, 4), (3, 1), (4, 1), (5, 1), (6, 1)].iter()
306        {
307            test(|mut ctx, mut bucket| {
308                // Initialize the `last_refilled` time and then drain the
309                // bucket, leaving the `last_refilled` time at t=0 and the
310                // bucket empty.
311                bucket.assert_take_n(&ctx, 64);
312
313                const ATTEMPTS: u32 = 1_000;
314                let mut successes = 0;
315                for _ in 0..ATTEMPTS {
316                    // In order to speed up by a factor of numer/denom, we
317                    // multiply the duration between tries by its inverse,
318                    // denom/numer.
319                    ctx.sleep((SECOND * *denom) / (64 * *numer));
320                    if bucket.try_take(&ctx) {
321                        successes += 1;
322                        assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
323                    }
324                }
325
326                // The observed rate can be up to 1/TOKEN_MULTIPLIER off in
327                // either direction.
328                let ideal_successes = (ATTEMPTS * denom) / numer;
329                let mult = u32::try_from(TOKEN_MULTIPLIER).unwrap();
330                assert!(successes <= (ideal_successes * (mult + 1)) / mult);
331                assert!(successes >= (ideal_successes * (mult - 1)) / mult);
332            });
333        }
334    }
335
336    #[test]
337    fn test_token_bucket_new() {
338        // Test that `new` doesn't panic if given 2^56 - 1.
339        let _: TokenBucket<()> = TokenBucket::<()>::new((1 << 56) - 1);
340    }
341
342    #[test]
343    #[should_panic]
344    fn test_token_bucket_new_panics() {
345        // Test that `new` panics if given 2^56
346        let _: TokenBucket<()> = TokenBucket::<()>::new(1 << 56);
347    }
348}
349
350#[cfg(any(test, benchmark))]
351pub(crate) mod benchmarks {
352    use super::*;
353
354    use crate::bench;
355    use crate::testutil::{Bencher, FakeInstantCtx};
356
357    fn bench_try_take<B: Bencher>(b: &mut B, enforced_rate: u64, try_rate: u32) {
358        let sleep = SECOND / try_rate;
359        let mut ctx = FakeInstantCtx::default();
360        let mut bucket = TokenBucket::new(enforced_rate);
361        b.iter(|| {
362            ctx.sleep(sleep);
363            let _: bool = B::black_box(bucket.try_take(B::black_box(&ctx)));
364        });
365    }
366
367    // These benchmarks measure the time taken to remove a token from the token
368    // bucket (using try_take) when tokens are being removed at various rates
369    // (relative to the rate at which they fill into the bucket).
370    // These benchmarks use the fastest possible `InstantContext`, and should be
371    // considered an upper bound on performance.
372
373    // Call `try_take` at 1/64 the enforced rate.
374    bench!(bench_try_take_slow, |b| bench_try_take(b, 64, 1));
375    // Call `try_take` at 1/2 the enforced rate.
376    bench!(bench_try_take_half_rate, |b| bench_try_take(b, 64, 32));
377    // Call `try_take` at the enforced rate.
378    bench!(bench_try_take_equal_rate, |b| bench_try_take(b, 64, 64));
379    // Call `try_take` at 65/64 the enforced rate.
380    bench!(bench_try_take_almost_equal_rate, |b| bench_try_take(b, 64, 65));
381    // Call `try_take` at 2x the enforced rate.
382    bench!(bench_try_take_double_rate, |b| bench_try_take(b, 64, 64 * 2));
383
384    #[cfg(benchmark)]
385    pub fn add_benches(b: criterion::Benchmark) -> criterion::Benchmark {
386        let mut b = b.with_function("TokenBucket/TryTake/Slow", bench_try_take_slow);
387        b = b.with_function("TokenBucket/TryTake/HalfRate", bench_try_take_half_rate);
388        b = b.with_function("TokenBucket/TryTake/EqualRate", bench_try_take_equal_rate);
389        b = b
390            .with_function("TokenBucket/TryTake/AlmostEqualRate", bench_try_take_almost_equal_rate);
391        b.with_function("TokenBucket/TryTake/DoubleRate", bench_try_take_double_rate)
392    }
393}