netstack3_base/data_structures/token_bucket.rs
1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::time::Duration;
6
7use crate::InstantContext;
8
9// TODO(https://github.com/rust-lang/rust/issues/57391): Replace this with Duration::SECOND.
10const SECOND: Duration = Duration::from_secs(1);
11
12/// Instead of actually storing the number of tokens, we store the number of
13/// fractions of `1 / TOKEN_MULTIPLIER`. If we stored the number of tokens, then
14/// under heavy load scenarios, the actual observed rate could be far off from
15/// the ideal rate due to integer rounding issues. Storing fractions instead
16/// limits the inaccuracy to at most `1 / TOKEN_MULTIPLIER` away from the ideal
17/// rate. See the comment in `try_take` for more details.
18///
19/// Note that the choice of 256 for `TOKEN_MULTIPLIER` provides us with good
20/// accuracy (only deviating from the ideal rate by 1/256) while still allowing
21/// for a maximum rate of 2^56 tokens per second.
22const TOKEN_MULTIPLIER: u64 = 256;
23
24/// A [token bucket] used for rate limiting.
25///
26/// `TokenBucket` implements rate limiting by "filling" a bucket with "tokens"
27/// at a constant rate, and allowing tokens to be consumed from the bucket until
28/// it is empty. This guarantees that a consumer may only maintain a rate of
29/// consumption faster than the rate of refilling for a bounded amount of time
30/// before they will catch up and find the bucket empty.
31///
32/// Note that the bucket has a maximum size beyond which no new tokens will be
33/// added. This prevents a long quiet period from building up a large backlog of
34/// tokens which can then be used in an intense and sustained burst.
35///
36/// This implementation does not require any background threads or timers to
37/// operate; it refills the bucket during calls to `try_take`, so no extra
38/// infrastructure is required to use it.
39///
40/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
41pub struct TokenBucket<I> {
42 // The last time that the bucket was refilled, or `None` if the bucket has
43 // never been refilled.
44 last_refilled: Option<I>,
45 token_fractions: u64,
46 token_fractions_per_second: u64,
47}
48
49impl<I> TokenBucket<I> {
50 /// Constructs a new `TokenBucket` and initializes it with one second's
51 /// worth of tokens.
52 ///
53 /// # Panics
54 ///
55 /// `new` panics if `tokens_per_second` is greater than 2^56 - 1.
56 pub fn new(tokens_per_second: u64) -> TokenBucket<I> {
57 let token_fractions_per_second = tokens_per_second.checked_mul(TOKEN_MULTIPLIER).unwrap();
58 TokenBucket {
59 last_refilled: None,
60 // Initialize to 0 so that the first call to `try_take` will
61 // initialize the `last_refilled` time and fill the bucket. If we
62 // initialized this to a full bucket, then an immediate burst of
63 // calls to `try_take` would appear as though they'd happened over
64 // the course of a second, and the client would effectively get
65 // double the ideal rate until the second round of tokens expired.
66 token_fractions: 0,
67 token_fractions_per_second,
68 }
69 }
70}
71
72impl<I: crate::Instant> TokenBucket<I> {
73 /// Attempt to take a token from the bucket.
74 ///
75 /// `try_take` attempts to take a token from the bucket. If the bucket is
76 /// currently empty, then no token is available to be taken, and `try_take`
77 /// return false.
78 pub fn try_take<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC) -> bool {
79 if self.token_fractions >= TOKEN_MULTIPLIER {
80 self.token_fractions -= TOKEN_MULTIPLIER;
81 return true;
82 }
83
84 // The algorithm implemented here is as follows: Whenever the bucket
85 // empties, refill it immediately. In order not to violate the
86 // requirement that tokens are added at a particular rate, we only add
87 // the number of tokens that "should have been" added since the last
88 // refill. We never add more than one second's worth of tokens at a time
89 // in order to guarantee that the bucket never has more than one
90 // second's worth of tokens in it.
91 //
92 // If tokens are being consumed at a rate slower than they are being
93 // added, then we will exhaust the bucket less often than once per
94 // second, and every refill will be a complete refill. If tokens are
95 // being consumed at a rate faster than they are being added, then the
96 // duration between refills will continuously decrease until every call
97 // to `try_take` adds 0 or t in [1, 2) tokens.
98 //
99 // Consider, for example, a production rate of 32 tokens per second and
100 // a consumption rate of 64 tokens per second:
101 // - First, there are 32 tokens in the bucket.
102 // - After 0.5 seconds, all 32 have been exhausted.
103 // - The call to `try_take` which exhausts the bucket refills the bucket
104 // with 0.5 seconds' worth of tokens, or 16 tokens.
105 //
106 // This process repeats itself, halving the number of tokens added (and
107 // halving the amount of time to exhaust the bucket) until, after an
108 // amount of time which is linear in the rate of tokens being added, a
109 // call to `try_take` adds only 0 or t in [1, 2) tokens. In either case,
110 // the bucket is left with less than 1 token (if `try_take` adds >= 1
111 // token, it also consumes 1 token immediately).
112 //
113 // This has the potential downside of, under heavy load, executing a
114 // slightly more complex algorithm on every call to `try_take`, which
115 // includes querying for the current time. I (joshlf) speculate that
116 // this isn't an issue in practice, but it's worth calling out in case
117 // it becomes an issue in the future.
118
119 let now = bindings_ctx.now();
120 // The duration since the last refill, or 1 second, whichever is
121 // shorter. If this is the first fill, pretend that a full second has
122 // elapsed since the previous refill. In reality, there was no previous
123 // refill, which means it's fine to fill the bucket completely.
124 let dur_since_last_refilled = self.last_refilled.map_or(SECOND, |last_refilled| {
125 let dur = now.saturating_duration_since(last_refilled);
126 if dur > SECOND { SECOND } else { dur }
127 });
128
129 // Do math in u128 to avoid overflow. Be careful to multiply first and
130 // then divide to minimize integer division rounding error. The result
131 // of the calculation should always fit in a `u64` because the ratio
132 // `dur_since_last_refilled / SECOND` is guaranteed not to be greater
133 // than 1.
134 let added_token_fractions = u64::try_from(
135 (u128::from(self.token_fractions_per_second) * dur_since_last_refilled.as_nanos())
136 / SECOND.as_nanos(),
137 )
138 .unwrap();
139
140 // Only refill the bucket if we can add at least 1 token. This avoids
141 // two failure modes:
142 // - If we always blindly added however many token fractions are
143 // available, then under heavy load, we might constantly add 0 token
144 // fractions (because less time has elapsed since `last_refilled` than
145 // is required to add a single token fraction) while still updating
146 // `last_refilled` each time. This would drop the observed rate to 0
147 // in the worst case.
148 // - If we always added >= 1 token fraction (as opposed to >= 1 full
149 // token), then we would run into integer math inaccuracy issues. In
150 // the worst case, `try_take` would be called after just less than the
151 // amount of time required to add two token fractions. The actual
152 // number of token fractions added would be rounded down to 1, and the
153 // observed rate would be slightly more than 1/2 of the ideal rate.
154 //
155 // By always adding at least 1 token, we ensure that the worst case
156 // behavior is when `try_take` is called after just less than the amount
157 // of time required to add `TOKEN_MULTIPLIER + 1` token fractions has
158 // elapsed. In this case, the actual number of token fractions added is
159 // rounded down to 1, and the observed rate is within `1 /
160 // TOKEN_MULTIPLIER` of the ideal rate.
161 if let Some(new_token_fractions) =
162 (self.token_fractions + added_token_fractions).checked_sub(TOKEN_MULTIPLIER)
163 {
164 self.token_fractions = new_token_fractions;
165 self.last_refilled = Some(now);
166 true
167 } else {
168 return false;
169 }
170 }
171}
172
173#[cfg(test)]
174pub(crate) mod tests {
175 use super::*;
176
177 use crate::testutil::{FakeInstant, FakeInstantCtx};
178
179 impl<I: crate::Instant> TokenBucket<I> {
180 /// Call `try_take` `n` times, and assert that it succeeds every time.
181 fn assert_take_n<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC, n: usize) {
182 for _ in 0..n {
183 assert!(self.try_take(bindings_ctx));
184 }
185 }
186 }
187
188 #[test]
189 fn test_token_bucket() {
190 /// Construct a `FakeInstantCtx` and a `TokenBucket` with a rate of 64
191 /// tokens per second, and pass them to `f`.
192 fn test<F: FnOnce(FakeInstantCtx, TokenBucket<FakeInstant>)>(f: F) {
193 f(FakeInstantCtx::default(), TokenBucket::new(64));
194 }
195
196 // Test that, if we consume all of the tokens in the bucket, but do not
197 // attempt to consume any more than that, the bucket will not be
198 // updated.
199 test(|mut ctx, mut bucket| {
200 let epoch = ctx.now();
201 assert!(bucket.try_take(&ctx));
202 assert_eq!(bucket.last_refilled.unwrap(), epoch);
203 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
204
205 // Sleep so that the current time will be different than the time at
206 // which the `last_refilled` time was initialized. That way, we can
207 // tell whether the `last_refilled` field was updated or not.
208 ctx.sleep(SECOND);
209 bucket.assert_take_n(&ctx, 63);
210 assert_eq!(bucket.last_refilled.unwrap(), epoch);
211 assert_eq!(bucket.token_fractions, 0);
212 });
213
214 // Test that, if we try to consume a token when the bucket is empty, it
215 // will get refilled.
216 test(|mut ctx, mut bucket| {
217 let epoch = ctx.now();
218 assert!(bucket.try_take(&ctx));
219 assert_eq!(bucket.last_refilled.unwrap(), epoch);
220 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
221
222 // Sleep for one second so that the bucket will be completely
223 // refilled.
224 ctx.sleep(SECOND);
225 bucket.assert_take_n(&ctx, 64);
226 assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND));
227 // 1 token was consumed by the last call to `try_take`.
228 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
229 });
230
231 // Test that, if more than 1 second has elapsed since the previous
232 // refill, we still only fill with 1 second's worth of tokens.
233 test(|mut ctx, mut bucket| {
234 let epoch = ctx.now();
235 assert!(bucket.try_take(&ctx));
236 assert_eq!(bucket.last_refilled.unwrap(), epoch);
237 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
238
239 ctx.sleep(SECOND * 2);
240 bucket.assert_take_n(&ctx, 64);
241 assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND * 2));
242 // 1 token was consumed by the last call to `try_take`.
243 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
244 });
245
246 // Test that, if we refill the bucket when less then a second has
247 // elapsed, a proportional amount of the bucket is refilled.
248 test(|mut ctx, mut bucket| {
249 let epoch = ctx.now();
250 assert!(bucket.try_take(&ctx));
251 assert_eq!(bucket.last_refilled.unwrap(), epoch);
252 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
253
254 ctx.sleep(SECOND / 2);
255 bucket.assert_take_n(&ctx, 64);
256 assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND / 2));
257 // Since only half a second had elapsed since the previous refill,
258 // only half of the tokens were refilled. 1 was consumed by the last
259 // call to `try_take`.
260 assert_eq!(bucket.token_fractions, 31 * TOKEN_MULTIPLIER);
261 });
262
263 // Test that, if we try to consume a token when the bucket is empty and
264 // not enough time has elapsed to allow for any tokens to be added,
265 // `try_take` will fail and the bucket will remain empty.
266 test(|mut ctx, mut bucket| {
267 // Allow 1/65 of a second to elapse so we know we're not just
268 // dealing with a consequence of no time having elapsed. The
269 // "correct" number of tokens to add after 1/65 of a second is
270 // 64/65, which will be rounded down to 0.
271 let epoch = ctx.now();
272 bucket.assert_take_n(&ctx, 64);
273 ctx.sleep(SECOND / 128);
274 assert!(!bucket.try_take(&ctx));
275 assert_eq!(bucket.last_refilled.unwrap(), epoch);
276 assert_eq!(bucket.token_fractions, 0);
277 });
278
279 // Test that, as long as we consume tokens at exactly the right rate, we
280 // never fail to consume a token.
281 test(|mut ctx, mut bucket| {
282 // Initialize the `last_refilled` time and then drain the bucket,
283 // leaving the `last_refilled` time at t=0 and the bucket empty.
284 bucket.assert_take_n(&ctx, 64);
285 for _ in 0..1_000 {
286 // `Duration`s store nanoseconds under the hood, and 64 divides
287 // 1e9 evenly, so this is lossless.
288 ctx.sleep(SECOND / 64);
289 assert!(bucket.try_take(&ctx));
290 assert_eq!(bucket.token_fractions, 0);
291 assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
292 }
293 });
294
295 // Test that, if we consume tokens too quickly, we succeed in consuming
296 // tokens the correct proportion of the time.
297 //
298 // Test with rates close to 1 (2/1 through 5/4) and rates much larger
299 // than 1 (3/1 through 6/1).
300 for (numer, denom) in
301 [(2, 1), (3, 2), (4, 3), (5, 4), (3, 1), (4, 1), (5, 1), (6, 1)].iter()
302 {
303 test(|mut ctx, mut bucket| {
304 // Initialize the `last_refilled` time and then drain the
305 // bucket, leaving the `last_refilled` time at t=0 and the
306 // bucket empty.
307 bucket.assert_take_n(&ctx, 64);
308
309 const ATTEMPTS: u32 = 1_000;
310 let mut successes = 0;
311 for _ in 0..ATTEMPTS {
312 // In order to speed up by a factor of numer/denom, we
313 // multiply the duration between tries by its inverse,
314 // denom/numer.
315 ctx.sleep((SECOND * *denom) / (64 * *numer));
316 if bucket.try_take(&ctx) {
317 successes += 1;
318 assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
319 }
320 }
321
322 // The observed rate can be up to 1/TOKEN_MULTIPLIER off in
323 // either direction.
324 let ideal_successes = (ATTEMPTS * denom) / numer;
325 let mult = u32::try_from(TOKEN_MULTIPLIER).unwrap();
326 assert!(successes <= (ideal_successes * (mult + 1)) / mult);
327 assert!(successes >= (ideal_successes * (mult - 1)) / mult);
328 });
329 }
330 }
331
332 #[test]
333 fn test_token_bucket_new() {
334 // Test that `new` doesn't panic if given 2^56 - 1.
335 let _: TokenBucket<()> = TokenBucket::<()>::new((1 << 56) - 1);
336 }
337
338 #[test]
339 #[should_panic]
340 fn test_token_bucket_new_panics() {
341 // Test that `new` panics if given 2^56
342 let _: TokenBucket<()> = TokenBucket::<()>::new(1 << 56);
343 }
344}
345
346#[cfg(any(test, benchmark))]
347pub(crate) mod benchmarks {
348 use super::*;
349
350 use crate::bench;
351 use crate::testutil::{Bencher, FakeInstantCtx};
352
353 fn bench_try_take<B: Bencher>(b: &mut B, enforced_rate: u64, try_rate: u32) {
354 let sleep = SECOND / try_rate;
355 let mut ctx = FakeInstantCtx::default();
356 let mut bucket = TokenBucket::new(enforced_rate);
357 b.iter(|| {
358 ctx.sleep(sleep);
359 let _: bool = B::black_box(bucket.try_take(B::black_box(&ctx)));
360 });
361 }
362
363 // These benchmarks measure the time taken to remove a token from the token
364 // bucket (using try_take) when tokens are being removed at various rates
365 // (relative to the rate at which they fill into the bucket).
366 // These benchmarks use the fastest possible `InstantContext`, and should be
367 // considered an upper bound on performance.
368
369 // Call `try_take` at 1/64 the enforced rate.
370 bench!(bench_try_take_slow, |b| bench_try_take(b, 64, 1));
371 // Call `try_take` at 1/2 the enforced rate.
372 bench!(bench_try_take_half_rate, |b| bench_try_take(b, 64, 32));
373 // Call `try_take` at the enforced rate.
374 bench!(bench_try_take_equal_rate, |b| bench_try_take(b, 64, 64));
375 // Call `try_take` at 65/64 the enforced rate.
376 bench!(bench_try_take_almost_equal_rate, |b| bench_try_take(b, 64, 65));
377 // Call `try_take` at 2x the enforced rate.
378 bench!(bench_try_take_double_rate, |b| bench_try_take(b, 64, 64 * 2));
379
380 #[cfg(benchmark)]
381 pub fn add_benches(b: criterion::Benchmark) -> criterion::Benchmark {
382 let mut b = b.with_function("TokenBucket/TryTake/Slow", bench_try_take_slow);
383 b = b.with_function("TokenBucket/TryTake/HalfRate", bench_try_take_half_rate);
384 b = b.with_function("TokenBucket/TryTake/EqualRate", bench_try_take_equal_rate);
385 b = b
386 .with_function("TokenBucket/TryTake/AlmostEqualRate", bench_try_take_almost_equal_rate);
387 b.with_function("TokenBucket/TryTake/DoubleRate", bench_try_take_double_rate)
388 }
389}