netstack3_base/data_structures/
token_bucket.rs

1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::time::Duration;
6
7use crate::InstantContext;
8
9// TODO(https://github.com/rust-lang/rust/issues/57391): Replace this with Duration::SECOND.
10const SECOND: Duration = Duration::from_secs(1);
11
12/// Instead of actually storing the number of tokens, we store the number of
13/// fractions of `1 / TOKEN_MULTIPLIER`. If we stored the number of tokens, then
14/// under heavy load scenarios, the actual observed rate could be far off from
15/// the ideal rate due to integer rounding issues. Storing fractions instead
16/// limits the inaccuracy to at most `1 / TOKEN_MULTIPLIER` away from the ideal
17/// rate. See the comment in `try_take` for more details.
18///
19/// Note that the choice of 256 for `TOKEN_MULTIPLIER` provides us with good
20/// accuracy (only deviating from the ideal rate by 1/256) while still allowing
21/// for a maximum rate of 2^56 tokens per second.
22const TOKEN_MULTIPLIER: u64 = 256;
23
24/// A [token bucket] used for rate limiting.
25///
26/// `TokenBucket` implements rate limiting by "filling" a bucket with "tokens"
27/// at a constant rate, and allowing tokens to be consumed from the bucket until
28/// it is empty. This guarantees that a consumer may only maintain a rate of
29/// consumption faster than the rate of refilling for a bounded amount of time
30/// before they will catch up and find the bucket empty.
31///
32/// Note that the bucket has a maximum size beyond which no new tokens will be
33/// added. This prevents a long quiet period from building up a large backlog of
34/// tokens which can then be used in an intense and sustained burst.
35///
36/// This implementation does not require any background threads or timers to
37/// operate; it refills the bucket during calls to `try_take`, so no extra
38/// infrastructure is required to use it.
39///
40/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
41pub struct TokenBucket<I> {
42    // The last time that the bucket was refilled, or `None` if the bucket has
43    // never been refilled.
44    last_refilled: Option<I>,
45    token_fractions: u64,
46    token_fractions_per_second: u64,
47}
48
49impl<I> TokenBucket<I> {
50    /// Constructs a new `TokenBucket` and initializes it with one second's
51    /// worth of tokens.
52    ///
53    /// # Panics
54    ///
55    /// `new` panics if `tokens_per_second` is greater than 2^56 - 1.
56    pub fn new(tokens_per_second: u64) -> TokenBucket<I> {
57        let token_fractions_per_second = tokens_per_second.checked_mul(TOKEN_MULTIPLIER).unwrap();
58        TokenBucket {
59            last_refilled: None,
60            // Initialize to 0 so that the first call to `try_take` will
61            // initialize the `last_refilled` time and fill the bucket. If we
62            // initialized this to a full bucket, then an immediate burst of
63            // calls to `try_take` would appear as though they'd happened over
64            // the course of a second, and the client would effectively get
65            // double the ideal rate until the second round of tokens expired.
66            token_fractions: 0,
67            token_fractions_per_second,
68        }
69    }
70}
71
72impl<I: crate::Instant> TokenBucket<I> {
73    /// Attempt to take a token from the bucket.
74    ///
75    /// `try_take` attempts to take a token from the bucket. If the bucket is
76    /// currently empty, then no token is available to be taken, and `try_take`
77    /// return false.
78    pub fn try_take<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC) -> bool {
79        if self.token_fractions >= TOKEN_MULTIPLIER {
80            self.token_fractions -= TOKEN_MULTIPLIER;
81            return true;
82        }
83
84        // The algorithm implemented here is as follows: Whenever the bucket
85        // empties, refill it immediately. In order not to violate the
86        // requirement that tokens are added at a particular rate, we only add
87        // the number of tokens that "should have been" added since the last
88        // refill. We never add more than one second's worth of tokens at a time
89        // in order to guarantee that the bucket never has more than one
90        // second's worth of tokens in it.
91        //
92        // If tokens are being consumed at a rate slower than they are being
93        // added, then we will exhaust the bucket less often than once per
94        // second, and every refill will be a complete refill. If tokens are
95        // being consumed at a rate faster than they are being added, then the
96        // duration between refills will continuously decrease until every call
97        // to `try_take` adds 0 or t in [1, 2) tokens.
98        //
99        // Consider, for example, a production rate of 32 tokens per second and
100        // a consumption rate of 64 tokens per second:
101        // - First, there are 32 tokens in the bucket.
102        // - After 0.5 seconds, all 32 have been exhausted.
103        // - The call to `try_take` which exhausts the bucket refills the bucket
104        //   with 0.5 seconds' worth of tokens, or 16 tokens.
105        //
106        // This process repeats itself, halving the number of tokens added (and
107        // halving the amount of time to exhaust the bucket) until, after an
108        // amount of time which is linear in the rate of tokens being added, a
109        // call to `try_take` adds only 0 or t in [1, 2) tokens. In either case,
110        // the bucket is left with less than 1 token (if `try_take` adds >= 1
111        // token, it also consumes 1 token immediately).
112        //
113        // This has the potential downside of, under heavy load, executing a
114        // slightly more complex algorithm on every call to `try_take`, which
115        // includes querying for the current time. I (joshlf) speculate that
116        // this isn't an issue in practice, but it's worth calling out in case
117        // it becomes an issue in the future.
118
119        let now = bindings_ctx.now();
120        // The duration since the last refill, or 1 second, whichever is
121        // shorter. If this is the first fill, pretend that a full second has
122        // elapsed since the previous refill. In reality, there was no previous
123        // refill, which means it's fine to fill the bucket completely.
124        let dur_since_last_refilled = self.last_refilled.map_or(SECOND, |last_refilled| {
125            let dur = now.saturating_duration_since(last_refilled);
126            if dur > SECOND { SECOND } else { dur }
127        });
128
129        // Do math in u128 to avoid overflow. Be careful to multiply first and
130        // then divide to minimize integer division rounding error. The result
131        // of the calculation should always fit in a `u64` because the ratio
132        // `dur_since_last_refilled / SECOND` is guaranteed not to be greater
133        // than 1.
134        let added_token_fractions = u64::try_from(
135            (u128::from(self.token_fractions_per_second) * dur_since_last_refilled.as_nanos())
136                / SECOND.as_nanos(),
137        )
138        .unwrap();
139
140        // Only refill the bucket if we can add at least 1 token. This avoids
141        // two failure modes:
142        // - If we always blindly added however many token fractions are
143        //   available, then under heavy load, we might constantly add 0 token
144        //   fractions (because less time has elapsed since `last_refilled` than
145        //   is required to add a single token fraction) while still updating
146        //   `last_refilled` each time. This would drop the observed rate to 0
147        //   in the worst case.
148        // - If we always added >= 1 token fraction (as opposed to >= 1 full
149        //   token), then we would run into integer math inaccuracy issues. In
150        //   the worst case, `try_take` would be called after just less than the
151        //   amount of time required to add two token fractions. The actual
152        //   number of token fractions added would be rounded down to 1, and the
153        //   observed rate would be slightly more than 1/2 of the ideal rate.
154        //
155        // By always adding at least 1 token, we ensure that the worst case
156        // behavior is when `try_take` is called after just less than the amount
157        // of time required to add `TOKEN_MULTIPLIER + 1` token fractions has
158        // elapsed. In this case, the actual number of token fractions added is
159        // rounded down to 1, and the observed rate is within `1 /
160        // TOKEN_MULTIPLIER` of the ideal rate.
161        if let Some(new_token_fractions) =
162            (self.token_fractions + added_token_fractions).checked_sub(TOKEN_MULTIPLIER)
163        {
164            self.token_fractions = new_token_fractions;
165            self.last_refilled = Some(now);
166            true
167        } else {
168            return false;
169        }
170    }
171}
172
173#[cfg(test)]
174pub(crate) mod tests {
175    use super::*;
176
177    use crate::testutil::{FakeInstant, FakeInstantCtx};
178
179    impl<I: crate::Instant> TokenBucket<I> {
180        /// Call `try_take` `n` times, and assert that it succeeds every time.
181        fn assert_take_n<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC, n: usize) {
182            for _ in 0..n {
183                assert!(self.try_take(bindings_ctx));
184            }
185        }
186    }
187
188    #[test]
189    fn test_token_bucket() {
190        /// Construct a `FakeInstantCtx` and a `TokenBucket` with a rate of 64
191        /// tokens per second, and pass them to `f`.
192        fn test<F: FnOnce(FakeInstantCtx, TokenBucket<FakeInstant>)>(f: F) {
193            f(FakeInstantCtx::default(), TokenBucket::new(64));
194        }
195
196        // Test that, if we consume all of the tokens in the bucket, but do not
197        // attempt to consume any more than that, the bucket will not be
198        // updated.
199        test(|mut ctx, mut bucket| {
200            let epoch = ctx.now();
201            assert!(bucket.try_take(&ctx));
202            assert_eq!(bucket.last_refilled.unwrap(), epoch);
203            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
204
205            // Sleep so that the current time will be different than the time at
206            // which the `last_refilled` time was initialized. That way, we can
207            // tell whether the `last_refilled` field was updated or not.
208            ctx.sleep(SECOND);
209            bucket.assert_take_n(&ctx, 63);
210            assert_eq!(bucket.last_refilled.unwrap(), epoch);
211            assert_eq!(bucket.token_fractions, 0);
212        });
213
214        // Test that, if we try to consume a token when the bucket is empty, it
215        // will get refilled.
216        test(|mut ctx, mut bucket| {
217            let epoch = ctx.now();
218            assert!(bucket.try_take(&ctx));
219            assert_eq!(bucket.last_refilled.unwrap(), epoch);
220            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
221
222            // Sleep for one second so that the bucket will be completely
223            // refilled.
224            ctx.sleep(SECOND);
225            bucket.assert_take_n(&ctx, 64);
226            assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND));
227            // 1 token was consumed by the last call to `try_take`.
228            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
229        });
230
231        // Test that, if more than 1 second has elapsed since the previous
232        // refill, we still only fill with 1 second's worth of tokens.
233        test(|mut ctx, mut bucket| {
234            let epoch = ctx.now();
235            assert!(bucket.try_take(&ctx));
236            assert_eq!(bucket.last_refilled.unwrap(), epoch);
237            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
238
239            ctx.sleep(SECOND * 2);
240            bucket.assert_take_n(&ctx, 64);
241            assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND * 2));
242            // 1 token was consumed by the last call to `try_take`.
243            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
244        });
245
246        // Test that, if we refill the bucket when less then a second has
247        // elapsed, a proportional amount of the bucket is refilled.
248        test(|mut ctx, mut bucket| {
249            let epoch = ctx.now();
250            assert!(bucket.try_take(&ctx));
251            assert_eq!(bucket.last_refilled.unwrap(), epoch);
252            assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
253
254            ctx.sleep(SECOND / 2);
255            bucket.assert_take_n(&ctx, 64);
256            assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND / 2));
257            // Since only half a second had elapsed since the previous refill,
258            // only half of the tokens were refilled. 1 was consumed by the last
259            // call to `try_take`.
260            assert_eq!(bucket.token_fractions, 31 * TOKEN_MULTIPLIER);
261        });
262
263        // Test that, if we try to consume a token when the bucket is empty and
264        // not enough time has elapsed to allow for any tokens to be added,
265        // `try_take` will fail and the bucket will remain empty.
266        test(|mut ctx, mut bucket| {
267            // Allow 1/65 of a second to elapse so we know we're not just
268            // dealing with a consequence of no time having elapsed. The
269            // "correct" number of tokens to add after 1/65 of a second is
270            // 64/65, which will be rounded down to 0.
271            let epoch = ctx.now();
272            bucket.assert_take_n(&ctx, 64);
273            ctx.sleep(SECOND / 128);
274            assert!(!bucket.try_take(&ctx));
275            assert_eq!(bucket.last_refilled.unwrap(), epoch);
276            assert_eq!(bucket.token_fractions, 0);
277        });
278
279        // Test that, as long as we consume tokens at exactly the right rate, we
280        // never fail to consume a token.
281        test(|mut ctx, mut bucket| {
282            // Initialize the `last_refilled` time and then drain the bucket,
283            // leaving the `last_refilled` time at t=0 and the bucket empty.
284            bucket.assert_take_n(&ctx, 64);
285            for _ in 0..1_000 {
286                // `Duration`s store nanoseconds under the hood, and 64 divides
287                // 1e9 evenly, so this is lossless.
288                ctx.sleep(SECOND / 64);
289                assert!(bucket.try_take(&ctx));
290                assert_eq!(bucket.token_fractions, 0);
291                assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
292            }
293        });
294
295        // Test that, if we consume tokens too quickly, we succeed in consuming
296        // tokens the correct proportion of the time.
297        //
298        // Test with rates close to 1 (2/1 through 5/4) and rates much larger
299        // than 1 (3/1 through 6/1).
300        for (numer, denom) in
301            [(2, 1), (3, 2), (4, 3), (5, 4), (3, 1), (4, 1), (5, 1), (6, 1)].iter()
302        {
303            test(|mut ctx, mut bucket| {
304                // Initialize the `last_refilled` time and then drain the
305                // bucket, leaving the `last_refilled` time at t=0 and the
306                // bucket empty.
307                bucket.assert_take_n(&ctx, 64);
308
309                const ATTEMPTS: u32 = 1_000;
310                let mut successes = 0;
311                for _ in 0..ATTEMPTS {
312                    // In order to speed up by a factor of numer/denom, we
313                    // multiply the duration between tries by its inverse,
314                    // denom/numer.
315                    ctx.sleep((SECOND * *denom) / (64 * *numer));
316                    if bucket.try_take(&ctx) {
317                        successes += 1;
318                        assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
319                    }
320                }
321
322                // The observed rate can be up to 1/TOKEN_MULTIPLIER off in
323                // either direction.
324                let ideal_successes = (ATTEMPTS * denom) / numer;
325                let mult = u32::try_from(TOKEN_MULTIPLIER).unwrap();
326                assert!(successes <= (ideal_successes * (mult + 1)) / mult);
327                assert!(successes >= (ideal_successes * (mult - 1)) / mult);
328            });
329        }
330    }
331
332    #[test]
333    fn test_token_bucket_new() {
334        // Test that `new` doesn't panic if given 2^56 - 1.
335        let _: TokenBucket<()> = TokenBucket::<()>::new((1 << 56) - 1);
336    }
337
338    #[test]
339    #[should_panic]
340    fn test_token_bucket_new_panics() {
341        // Test that `new` panics if given 2^56
342        let _: TokenBucket<()> = TokenBucket::<()>::new(1 << 56);
343    }
344}
345
346#[cfg(any(test, benchmark))]
347pub(crate) mod benchmarks {
348    use super::*;
349
350    use crate::bench;
351    use crate::testutil::{Bencher, FakeInstantCtx};
352
353    fn bench_try_take<B: Bencher>(b: &mut B, enforced_rate: u64, try_rate: u32) {
354        let sleep = SECOND / try_rate;
355        let mut ctx = FakeInstantCtx::default();
356        let mut bucket = TokenBucket::new(enforced_rate);
357        b.iter(|| {
358            ctx.sleep(sleep);
359            let _: bool = B::black_box(bucket.try_take(B::black_box(&ctx)));
360        });
361    }
362
363    // These benchmarks measure the time taken to remove a token from the token
364    // bucket (using try_take) when tokens are being removed at various rates
365    // (relative to the rate at which they fill into the bucket).
366    // These benchmarks use the fastest possible `InstantContext`, and should be
367    // considered an upper bound on performance.
368
369    // Call `try_take` at 1/64 the enforced rate.
370    bench!(bench_try_take_slow, |b| bench_try_take(b, 64, 1));
371    // Call `try_take` at 1/2 the enforced rate.
372    bench!(bench_try_take_half_rate, |b| bench_try_take(b, 64, 32));
373    // Call `try_take` at the enforced rate.
374    bench!(bench_try_take_equal_rate, |b| bench_try_take(b, 64, 64));
375    // Call `try_take` at 65/64 the enforced rate.
376    bench!(bench_try_take_almost_equal_rate, |b| bench_try_take(b, 64, 65));
377    // Call `try_take` at 2x the enforced rate.
378    bench!(bench_try_take_double_rate, |b| bench_try_take(b, 64, 64 * 2));
379
380    #[cfg(benchmark)]
381    pub fn add_benches(b: criterion::Benchmark) -> criterion::Benchmark {
382        let mut b = b.with_function("TokenBucket/TryTake/Slow", bench_try_take_slow);
383        b = b.with_function("TokenBucket/TryTake/HalfRate", bench_try_take_half_rate);
384        b = b.with_function("TokenBucket/TryTake/EqualRate", bench_try_take_equal_rate);
385        b = b
386            .with_function("TokenBucket/TryTake/AlmostEqualRate", bench_try_take_almost_equal_rate);
387        b.with_function("TokenBucket/TryTake/DoubleRate", bench_try_take_double_rate)
388    }
389}