netstack3_base/data_structures/token_bucket.rs
1// Copyright 2019 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use core::time::Duration;
6
7use crate::InstantContext;
8
9// TODO(https://github.com/rust-lang/rust/issues/57391): Replace this with Duration::SECOND.
10const SECOND: Duration = Duration::from_secs(1);
11
12/// Instead of actually storing the number of tokens, we store the number of
13/// fractions of `1 / TOKEN_MULTIPLIER`. If we stored the number of tokens, then
14/// under heavy load scenarios, the actual observed rate could be far off from
15/// the ideal rate due to integer rounding issues. Storing fractions instead
16/// limits the inaccuracy to at most `1 / TOKEN_MULTIPLIER` away from the ideal
17/// rate. See the comment in `try_take` for more details.
18///
19/// Note that the choice of 256 for `TOKEN_MULTIPLIER` provides us with good
20/// accuracy (only deviating from the ideal rate by 1/256) while still allowing
21/// for a maximum rate of 2^56 tokens per second.
22const TOKEN_MULTIPLIER: u64 = 256;
23
24/// A [token bucket] used for rate limiting.
25///
26/// `TokenBucket` implements rate limiting by "filling" a bucket with "tokens"
27/// at a constant rate, and allowing tokens to be consumed from the bucket until
28/// it is empty. This guarantees that a consumer may only maintain a rate of
29/// consumption faster than the rate of refilling for a bounded amount of time
30/// before they will catch up and find the bucket empty.
31///
32/// Note that the bucket has a maximum size beyond which no new tokens will be
33/// added. This prevents a long quiet period from building up a large backlog of
34/// tokens which can then be used in an intense and sustained burst.
35///
36/// This implementation does not require any background threads or timers to
37/// operate; it refills the bucket during calls to `try_take`, so no extra
38/// infrastructure is required to use it.
39///
40/// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket
41pub struct TokenBucket<I> {
42 // The last time that the bucket was refilled, or `None` if the bucket has
43 // never been refilled.
44 last_refilled: Option<I>,
45 token_fractions: u64,
46 token_fractions_per_second: u64,
47}
48
49impl<I> TokenBucket<I> {
50 /// Constructs a new `TokenBucket` and initializes it with one second's
51 /// worth of tokens.
52 ///
53 /// # Panics
54 ///
55 /// `new` panics if `tokens_per_second` is greater than 2^56 - 1.
56 pub fn new(tokens_per_second: u64) -> TokenBucket<I> {
57 let token_fractions_per_second = tokens_per_second.checked_mul(TOKEN_MULTIPLIER).unwrap();
58 TokenBucket {
59 last_refilled: None,
60 // Initialize to 0 so that the first call to `try_take` will
61 // initialize the `last_refilled` time and fill the bucket. If we
62 // initialized this to a full bucket, then an immediate burst of
63 // calls to `try_take` would appear as though they'd happened over
64 // the course of a second, and the client would effectively get
65 // double the ideal rate until the second round of tokens expired.
66 token_fractions: 0,
67 token_fractions_per_second,
68 }
69 }
70}
71
72impl<I: crate::Instant> TokenBucket<I> {
73 /// Attempt to take a token from the bucket.
74 ///
75 /// `try_take` attempts to take a token from the bucket. If the bucket is
76 /// currently empty, then no token is available to be taken, and `try_take`
77 /// return false.
78 pub fn try_take<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC) -> bool {
79 if self.token_fractions >= TOKEN_MULTIPLIER {
80 self.token_fractions -= TOKEN_MULTIPLIER;
81 return true;
82 }
83
84 // The algorithm implemented here is as follows: Whenever the bucket
85 // empties, refill it immediately. In order not to violate the
86 // requirement that tokens are added at a particular rate, we only add
87 // the number of tokens that "should have been" added since the last
88 // refill. We never add more than one second's worth of tokens at a time
89 // in order to guarantee that the bucket never has more than one
90 // second's worth of tokens in it.
91 //
92 // If tokens are being consumed at a rate slower than they are being
93 // added, then we will exhaust the bucket less often than once per
94 // second, and every refill will be a complete refill. If tokens are
95 // being consumed at a rate faster than they are being added, then the
96 // duration between refills will continuously decrease until every call
97 // to `try_take` adds 0 or t in [1, 2) tokens.
98 //
99 // Consider, for example, a production rate of 32 tokens per second and
100 // a consumption rate of 64 tokens per second:
101 // - First, there are 32 tokens in the bucket.
102 // - After 0.5 seconds, all 32 have been exhausted.
103 // - The call to `try_take` which exhausts the bucket refills the bucket
104 // with 0.5 seconds' worth of tokens, or 16 tokens.
105 //
106 // This process repeats itself, halving the number of tokens added (and
107 // halving the amount of time to exhaust the bucket) until, after an
108 // amount of time which is linear in the rate of tokens being added, a
109 // call to `try_take` adds only 0 or t in [1, 2) tokens. In either case,
110 // the bucket is left with less than 1 token (if `try_take` adds >= 1
111 // token, it also consumes 1 token immediately).
112 //
113 // This has the potential downside of, under heavy load, executing a
114 // slightly more complex algorithm on every call to `try_take`, which
115 // includes querying for the current time. I (joshlf) speculate that
116 // this isn't an issue in practice, but it's worth calling out in case
117 // it becomes an issue in the future.
118
119 let now = bindings_ctx.now();
120 // The duration since the last refill, or 1 second, whichever is
121 // shorter. If this is the first fill, pretend that a full second has
122 // elapsed since the previous refill. In reality, there was no previous
123 // refill, which means it's fine to fill the bucket completely.
124 let dur_since_last_refilled = self.last_refilled.map_or(SECOND, |last_refilled| {
125 let dur = now.saturating_duration_since(last_refilled);
126 if dur > SECOND {
127 SECOND
128 } else {
129 dur
130 }
131 });
132
133 // Do math in u128 to avoid overflow. Be careful to multiply first and
134 // then divide to minimize integer division rounding error. The result
135 // of the calculation should always fit in a `u64` because the ratio
136 // `dur_since_last_refilled / SECOND` is guaranteed not to be greater
137 // than 1.
138 let added_token_fractions = u64::try_from(
139 (u128::from(self.token_fractions_per_second) * dur_since_last_refilled.as_nanos())
140 / SECOND.as_nanos(),
141 )
142 .unwrap();
143
144 // Only refill the bucket if we can add at least 1 token. This avoids
145 // two failure modes:
146 // - If we always blindly added however many token fractions are
147 // available, then under heavy load, we might constantly add 0 token
148 // fractions (because less time has elapsed since `last_refilled` than
149 // is required to add a single token fraction) while still updating
150 // `last_refilled` each time. This would drop the observed rate to 0
151 // in the worst case.
152 // - If we always added >= 1 token fraction (as opposed to >= 1 full
153 // token), then we would run into integer math inaccuracy issues. In
154 // the worst case, `try_take` would be called after just less than the
155 // amount of time required to add two token fractions. The actual
156 // number of token fractions added would be rounded down to 1, and the
157 // observed rate would be slightly more than 1/2 of the ideal rate.
158 //
159 // By always adding at least 1 token, we ensure that the worst case
160 // behavior is when `try_take` is called after just less than the amount
161 // of time required to add `TOKEN_MULTIPLIER + 1` token fractions has
162 // elapsed. In this case, the actual number of token fractions added is
163 // rounded down to 1, and the observed rate is within `1 /
164 // TOKEN_MULTIPLIER` of the ideal rate.
165 if let Some(new_token_fractions) =
166 (self.token_fractions + added_token_fractions).checked_sub(TOKEN_MULTIPLIER)
167 {
168 self.token_fractions = new_token_fractions;
169 self.last_refilled = Some(now);
170 true
171 } else {
172 return false;
173 }
174 }
175}
176
177#[cfg(test)]
178pub(crate) mod tests {
179 use super::*;
180
181 use crate::testutil::{FakeInstant, FakeInstantCtx};
182
183 impl<I: crate::Instant> TokenBucket<I> {
184 /// Call `try_take` `n` times, and assert that it succeeds every time.
185 fn assert_take_n<BC: InstantContext<Instant = I>>(&mut self, bindings_ctx: &BC, n: usize) {
186 for _ in 0..n {
187 assert!(self.try_take(bindings_ctx));
188 }
189 }
190 }
191
192 #[test]
193 fn test_token_bucket() {
194 /// Construct a `FakeInstantCtx` and a `TokenBucket` with a rate of 64
195 /// tokens per second, and pass them to `f`.
196 fn test<F: FnOnce(FakeInstantCtx, TokenBucket<FakeInstant>)>(f: F) {
197 f(FakeInstantCtx::default(), TokenBucket::new(64));
198 }
199
200 // Test that, if we consume all of the tokens in the bucket, but do not
201 // attempt to consume any more than that, the bucket will not be
202 // updated.
203 test(|mut ctx, mut bucket| {
204 let epoch = ctx.now();
205 assert!(bucket.try_take(&ctx));
206 assert_eq!(bucket.last_refilled.unwrap(), epoch);
207 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
208
209 // Sleep so that the current time will be different than the time at
210 // which the `last_refilled` time was initialized. That way, we can
211 // tell whether the `last_refilled` field was updated or not.
212 ctx.sleep(SECOND);
213 bucket.assert_take_n(&ctx, 63);
214 assert_eq!(bucket.last_refilled.unwrap(), epoch);
215 assert_eq!(bucket.token_fractions, 0);
216 });
217
218 // Test that, if we try to consume a token when the bucket is empty, it
219 // will get refilled.
220 test(|mut ctx, mut bucket| {
221 let epoch = ctx.now();
222 assert!(bucket.try_take(&ctx));
223 assert_eq!(bucket.last_refilled.unwrap(), epoch);
224 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
225
226 // Sleep for one second so that the bucket will be completely
227 // refilled.
228 ctx.sleep(SECOND);
229 bucket.assert_take_n(&ctx, 64);
230 assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND));
231 // 1 token was consumed by the last call to `try_take`.
232 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
233 });
234
235 // Test that, if more than 1 second has elapsed since the previous
236 // refill, we still only fill with 1 second's worth of tokens.
237 test(|mut ctx, mut bucket| {
238 let epoch = ctx.now();
239 assert!(bucket.try_take(&ctx));
240 assert_eq!(bucket.last_refilled.unwrap(), epoch);
241 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
242
243 ctx.sleep(SECOND * 2);
244 bucket.assert_take_n(&ctx, 64);
245 assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND * 2));
246 // 1 token was consumed by the last call to `try_take`.
247 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
248 });
249
250 // Test that, if we refill the bucket when less then a second has
251 // elapsed, a proportional amount of the bucket is refilled.
252 test(|mut ctx, mut bucket| {
253 let epoch = ctx.now();
254 assert!(bucket.try_take(&ctx));
255 assert_eq!(bucket.last_refilled.unwrap(), epoch);
256 assert_eq!(bucket.token_fractions, 63 * TOKEN_MULTIPLIER);
257
258 ctx.sleep(SECOND / 2);
259 bucket.assert_take_n(&ctx, 64);
260 assert_eq!(bucket.last_refilled.unwrap(), FakeInstant::from(SECOND / 2));
261 // Since only half a second had elapsed since the previous refill,
262 // only half of the tokens were refilled. 1 was consumed by the last
263 // call to `try_take`.
264 assert_eq!(bucket.token_fractions, 31 * TOKEN_MULTIPLIER);
265 });
266
267 // Test that, if we try to consume a token when the bucket is empty and
268 // not enough time has elapsed to allow for any tokens to be added,
269 // `try_take` will fail and the bucket will remain empty.
270 test(|mut ctx, mut bucket| {
271 // Allow 1/65 of a second to elapse so we know we're not just
272 // dealing with a consequence of no time having elapsed. The
273 // "correct" number of tokens to add after 1/65 of a second is
274 // 64/65, which will be rounded down to 0.
275 let epoch = ctx.now();
276 bucket.assert_take_n(&ctx, 64);
277 ctx.sleep(SECOND / 128);
278 assert!(!bucket.try_take(&ctx));
279 assert_eq!(bucket.last_refilled.unwrap(), epoch);
280 assert_eq!(bucket.token_fractions, 0);
281 });
282
283 // Test that, as long as we consume tokens at exactly the right rate, we
284 // never fail to consume a token.
285 test(|mut ctx, mut bucket| {
286 // Initialize the `last_refilled` time and then drain the bucket,
287 // leaving the `last_refilled` time at t=0 and the bucket empty.
288 bucket.assert_take_n(&ctx, 64);
289 for _ in 0..1_000 {
290 // `Duration`s store nanoseconds under the hood, and 64 divides
291 // 1e9 evenly, so this is lossless.
292 ctx.sleep(SECOND / 64);
293 assert!(bucket.try_take(&ctx));
294 assert_eq!(bucket.token_fractions, 0);
295 assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
296 }
297 });
298
299 // Test that, if we consume tokens too quickly, we succeed in consuming
300 // tokens the correct proportion of the time.
301 //
302 // Test with rates close to 1 (2/1 through 5/4) and rates much larger
303 // than 1 (3/1 through 6/1).
304 for (numer, denom) in
305 [(2, 1), (3, 2), (4, 3), (5, 4), (3, 1), (4, 1), (5, 1), (6, 1)].iter()
306 {
307 test(|mut ctx, mut bucket| {
308 // Initialize the `last_refilled` time and then drain the
309 // bucket, leaving the `last_refilled` time at t=0 and the
310 // bucket empty.
311 bucket.assert_take_n(&ctx, 64);
312
313 const ATTEMPTS: u32 = 1_000;
314 let mut successes = 0;
315 for _ in 0..ATTEMPTS {
316 // In order to speed up by a factor of numer/denom, we
317 // multiply the duration between tries by its inverse,
318 // denom/numer.
319 ctx.sleep((SECOND * *denom) / (64 * *numer));
320 if bucket.try_take(&ctx) {
321 successes += 1;
322 assert_eq!(bucket.last_refilled.unwrap(), ctx.now());
323 }
324 }
325
326 // The observed rate can be up to 1/TOKEN_MULTIPLIER off in
327 // either direction.
328 let ideal_successes = (ATTEMPTS * denom) / numer;
329 let mult = u32::try_from(TOKEN_MULTIPLIER).unwrap();
330 assert!(successes <= (ideal_successes * (mult + 1)) / mult);
331 assert!(successes >= (ideal_successes * (mult - 1)) / mult);
332 });
333 }
334 }
335
336 #[test]
337 fn test_token_bucket_new() {
338 // Test that `new` doesn't panic if given 2^56 - 1.
339 let _: TokenBucket<()> = TokenBucket::<()>::new((1 << 56) - 1);
340 }
341
342 #[test]
343 #[should_panic]
344 fn test_token_bucket_new_panics() {
345 // Test that `new` panics if given 2^56
346 let _: TokenBucket<()> = TokenBucket::<()>::new(1 << 56);
347 }
348}
349
350#[cfg(any(test, benchmark))]
351pub(crate) mod benchmarks {
352 use super::*;
353
354 use crate::bench;
355 use crate::testutil::{Bencher, FakeInstantCtx};
356
357 fn bench_try_take<B: Bencher>(b: &mut B, enforced_rate: u64, try_rate: u32) {
358 let sleep = SECOND / try_rate;
359 let mut ctx = FakeInstantCtx::default();
360 let mut bucket = TokenBucket::new(enforced_rate);
361 b.iter(|| {
362 ctx.sleep(sleep);
363 let _: bool = B::black_box(bucket.try_take(B::black_box(&ctx)));
364 });
365 }
366
367 // These benchmarks measure the time taken to remove a token from the token
368 // bucket (using try_take) when tokens are being removed at various rates
369 // (relative to the rate at which they fill into the bucket).
370 // These benchmarks use the fastest possible `InstantContext`, and should be
371 // considered an upper bound on performance.
372
373 // Call `try_take` at 1/64 the enforced rate.
374 bench!(bench_try_take_slow, |b| bench_try_take(b, 64, 1));
375 // Call `try_take` at 1/2 the enforced rate.
376 bench!(bench_try_take_half_rate, |b| bench_try_take(b, 64, 32));
377 // Call `try_take` at the enforced rate.
378 bench!(bench_try_take_equal_rate, |b| bench_try_take(b, 64, 64));
379 // Call `try_take` at 65/64 the enforced rate.
380 bench!(bench_try_take_almost_equal_rate, |b| bench_try_take(b, 64, 65));
381 // Call `try_take` at 2x the enforced rate.
382 bench!(bench_try_take_double_rate, |b| bench_try_take(b, 64, 64 * 2));
383
384 #[cfg(benchmark)]
385 pub fn add_benches(b: criterion::Benchmark) -> criterion::Benchmark {
386 let mut b = b.with_function("TokenBucket/TryTake/Slow", bench_try_take_slow);
387 b = b.with_function("TokenBucket/TryTake/HalfRate", bench_try_take_half_rate);
388 b = b.with_function("TokenBucket/TryTake/EqualRate", bench_try_take_equal_rate);
389 b = b
390 .with_function("TokenBucket/TryTake/AlmostEqualRate", bench_try_take_almost_equal_rate);
391 b.with_function("TokenBucket/TryTake/DoubleRate", bench_try_take_double_rate)
392 }
393}