Skip to main content

storage_device/
buffer_allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16    use fuchsia_runtime::vmar_root_self;
17    use std::ops::Range;
18    use std::sync::Arc;
19
20    /// A buffer source backed by a VMO.
21    #[derive(Debug)]
22    pub struct BufferSource {
23        base: *mut u8,
24        size: usize,
25        vmo: Arc<zx::Vmo>,
26    }
27
28    // SAFETY: This is required for the *mut u8 which is just the base address of the VMO mapping
29    // and doesn't stop us making BufferSource Send and Sync.
30    unsafe impl Send for BufferSource {}
31    unsafe impl Sync for BufferSource {}
32
33    impl BufferSource {
34        pub fn new(size: usize) -> Self {
35            let vmo = Arc::new(zx::Vmo::create(size as u64).unwrap());
36            let name = zx::Name::new("transfer-buf").unwrap();
37            vmo.set_name(&name).unwrap();
38            let flags = zx::VmarFlags::PERM_READ
39                | zx::VmarFlags::PERM_WRITE
40                | zx::VmarFlags::MAP_RANGE
41                | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42            let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43            Self { base, size, vmo }
44        }
45
46        pub fn slice(&self) -> *mut [u8] {
47            std::ptr::slice_from_raw_parts_mut(self.base, self.size)
48        }
49
50        pub fn size(&self) -> usize {
51            self.size
52        }
53
54        pub fn vmo(&self) -> &Arc<zx::Vmo> {
55            &self.vmo
56        }
57
58        #[allow(clippy::mut_from_ref)]
59        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
60            assert!(range.start < self.size && range.end <= self.size);
61            unsafe {
62                std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
63            }
64        }
65
66        /// Commits the range in memory to avoid future page faults.
67        pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
68            self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
69        }
70
71        /// Zeroes out the range so the kerne can reclaim the pages.
72        ///
73        /// # Safety
74        ///
75        /// The range must not be allocated.
76        pub(super) unsafe fn clean_range(&self, range: Range<usize>) {
77            let _ = self.vmo.op_range(zx::VmoOp::ZERO, range.start as u64, range.len() as u64);
78        }
79    }
80
81    impl Drop for BufferSource {
82        fn drop(&mut self) {
83            // SAFETY: This balances the `map` in `new` above.
84            unsafe {
85                let _ = vmar_root_self().unmap(self.base as usize, self.size);
86            }
87        }
88    }
89}
90
91#[cfg(not(target_os = "fuchsia"))]
92mod buffer_source {
93    use std::cell::UnsafeCell;
94    use std::ops::Range;
95    use std::pin::Pin;
96
97    /// A basic heap-backed buffer source.
98    #[derive(Debug)]
99    pub struct BufferSource {
100        // We use an UnsafeCell here because we need interior mutability of the buffer (to hand out
101        // mutable slices to it in |buffer()|), but don't want to pay the cost of wrapping the
102        // buffer in a Mutex. We must guarantee that the Buffer objects we hand out don't overlap,
103        // but that is already a requirement for correctness.
104        data: UnsafeCell<Pin<Vec<u8>>>,
105    }
106
107    // Safe because none of the fields in BufferSource are modified, except the contents of |data|,
108    // but that is managed by the BufferAllocator.
109    unsafe impl Sync for BufferSource {}
110
111    impl BufferSource {
112        pub fn new(size: usize) -> Self {
113            Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
114        }
115
116        pub fn size(&self) -> usize {
117            // Safe because the reference goes out of scope as soon as we use it.
118            unsafe { (&*self.data.get()).len() }
119        }
120
121        #[allow(clippy::mut_from_ref)]
122        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
123            assert!(range.start < self.size() && range.end <= self.size());
124            unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
125        }
126
127        /// Zeroes out the range.
128        ///
129        /// # Safety
130        ///
131        /// The range must not be allocated.
132        pub(super) unsafe fn clean_range(&self, range: Range<usize>) {
133            unsafe { self.sub_slice(&range) }.fill(0);
134        }
135    }
136}
137
138pub use buffer_source::BufferSource;
139
140// Stores a list of offsets into a BufferSource. The size of the free ranges is determined by which
141// FreeList we are looking at.
142// FreeLists are sorted.
143type FreeList = Vec<usize>;
144
145#[derive(Debug)]
146struct Inner {
147    // The index corresponds to the order of free memory blocks in the free list.
148    free_lists: Vec<FreeList>,
149    // Maps offsets to allocated length (the actual length, not the size requested by the client).
150    allocation_map: BTreeMap<usize, usize>,
151}
152
153/// BufferAllocator creates Buffer objects to be used for block device I/O requests.
154///
155/// This is implemented through a simple buddy allocation scheme.
156#[derive(Debug)]
157pub struct BufferAllocator {
158    block_size: usize,
159    source: BufferSource,
160    inner: Mutex<Inner>,
161    event: Event,
162}
163
164// Returns the smallest order which is at least |size| bytes.
165fn order(size: usize, block_size: usize) -> usize {
166    if size <= block_size {
167        return 0;
168    }
169    let nblocks = round_up(size, block_size) / block_size;
170    nblocks.next_power_of_two().trailing_zeros() as usize
171}
172
173// Returns the largest order which is no more than |size| bytes.
174fn order_fit(size: usize, block_size: usize) -> usize {
175    assert!(size >= block_size);
176    let nblocks = round_up(size, block_size) / block_size;
177    if nblocks.is_power_of_two() {
178        nblocks.trailing_zeros() as usize
179    } else {
180        nblocks.next_power_of_two().trailing_zeros() as usize - 1
181    }
182}
183
184fn size_for_order(order: usize, block_size: usize) -> usize {
185    block_size * (1 << (order as u32))
186}
187
188fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
189    let size = round_down(size, block_size);
190    assert!(block_size <= size);
191    assert!(block_size.is_power_of_two());
192    let max_order = order_fit(size, block_size);
193    let mut free_lists = Vec::with_capacity(max_order + 1);
194    for _ in 0..=max_order {
195        free_lists.push(FreeList::new())
196    }
197    let mut offset = 0;
198    while offset < size {
199        let order = order_fit(size - offset, block_size);
200        let size = size_for_order(order, block_size);
201        free_lists[order].push(offset);
202        offset += size;
203    }
204    free_lists
205}
206
207/// A future which will resolve to an allocated [`Buffer`].
208pub struct BufferFuture<'a> {
209    allocator: &'a BufferAllocator,
210    size: usize,
211    listener: Option<EventListener>,
212}
213
214impl<'a> Future for BufferFuture<'a> {
215    type Output = Buffer<'a>;
216
217    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
218        if let Some(listener) = self.listener.as_mut() {
219            futures::ready!(listener.poll_unpin(context));
220        }
221        // Loop because we need to deal with the case where `listener` is ready immediately upon
222        // creation, in which case we ought to retry the allocation.
223        loop {
224            match self.allocator.try_allocate_buffer(self.size) {
225                Ok(buffer) => return Poll::Ready(buffer),
226                Err(mut listener) => {
227                    if listener.poll_unpin(context).is_pending() {
228                        self.listener = Some(listener);
229                        return Poll::Pending;
230                    }
231                }
232            }
233        }
234    }
235}
236
237impl BufferAllocator {
238    pub fn new(block_size: usize, source: BufferSource) -> Self {
239        let free_lists = initial_free_lists(source.size(), block_size);
240        Self {
241            block_size,
242            source,
243            inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
244            event: Event::new(),
245        }
246    }
247
248    pub fn block_size(&self) -> usize {
249        self.block_size
250    }
251
252    pub fn buffer_source(&self) -> &BufferSource {
253        &self.source
254    }
255
256    /// Takes the buffer source from the allocator and consumes the allocator.
257    pub fn take_buffer_source(self) -> BufferSource {
258        self.source
259    }
260
261    /// Allocates a Buffer with capacity for |size| bytes. Panics if the allocation exceeds the pool
262    /// size.  Blocks until there are enough bytes available to satisfy the request.
263    ///
264    /// The allocated buffer will be block-aligned and the padding up to block alignment can also
265    /// be used by the buffer.
266    ///
267    /// Allocation is O(lg(N) + M), where N = size and M = number of allocations.
268    pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
269        BufferFuture { allocator: self, size, listener: None }
270    }
271
272    /// Like |allocate_buffer|, but returns an EventListener if the allocation cannot be satisfied.
273    /// The listener will signal when the caller should try again.
274    pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
275        if size > self.source.size() {
276            panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
277        }
278        let mut inner = self.inner.lock();
279        let requested_order = order(size, self.block_size());
280        assert!(requested_order < inner.free_lists.len());
281        // Pick the smallest possible order with a free entry.
282        let mut order = {
283            let mut idx = requested_order;
284            loop {
285                if idx >= inner.free_lists.len() {
286                    return Err(self.event.listen());
287                }
288                if !inner.free_lists[idx].is_empty() {
289                    break idx;
290                }
291                idx += 1;
292            }
293        };
294
295        // Split the free region until it's the right size.
296        let offset = inner.free_lists[order].pop().unwrap();
297        while order > requested_order {
298            order -= 1;
299            assert!(inner.free_lists[order].is_empty());
300            inner.free_lists[order].push(offset + self.size_for_order(order));
301        }
302
303        inner.allocation_map.insert(offset, self.size_for_order(order));
304        let range = offset..offset + size;
305        log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
306
307        // Safety is ensured by the allocator not double-allocating any regions.
308        Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
309    }
310
311    /// Deallocation is O(lg(N) + M), where N = size and M = number of allocations.
312    #[doc(hidden)]
313    pub(super) fn free_buffer(&self, range: Range<usize>) {
314        let mut inner = self.inner.lock();
315        let mut offset = range.start;
316        let size = inner
317            .allocation_map
318            .remove(&offset)
319            .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
320        assert!(range.end - range.start <= size);
321        log::debug!(range:?, bytes_used = size; "Freeing");
322
323        // Merge as many free slots as we can.
324        let mut order = order(size, self.block_size());
325        while order < inner.free_lists.len() - 1 {
326            let buddy = self.find_buddy(offset, order);
327            let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
328                idx
329            } else {
330                break;
331            };
332            inner.free_lists[order].remove(idx);
333            offset = std::cmp::min(offset, buddy);
334            order += 1;
335        }
336
337        let idx = match inner.free_lists[order].binary_search(&offset) {
338            Ok(_) => panic!("Unexpectedly found {} in free list {}", offset, order),
339            Err(idx) => idx,
340        };
341        inner.free_lists[order].insert(idx, offset);
342
343        // Notify all stuck tasks.  This might be inefficient, but it's simple and correct.
344        self.event.notify(usize::MAX);
345    }
346
347    fn size_for_order(&self, order: usize) -> usize {
348        size_for_order(order, self.block_size)
349    }
350
351    fn find_buddy(&self, offset: usize, order: usize) -> usize {
352        offset ^ self.size_for_order(order)
353    }
354
355    /// Zeroes out all unnallocated ranges in the transfer buffer.
356    pub fn clean_transfer_buffer(&self) {
357        let inner = self.inner.lock();
358        for (n, free_list) in inner.free_lists.iter().enumerate() {
359            for offset in free_list {
360                // SAFETY: The range is not allocated.
361                unsafe {
362                    self.source.clean_range(*offset..(*offset + size_for_order(n, self.block_size)))
363                };
364            }
365        }
366    }
367}
368
369#[cfg(test)]
370mod tests {
371    use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
372    use fuchsia_async as fasync;
373    use futures::future::join_all;
374    use futures::pin_mut;
375    use rand::seq::IndexedRandom;
376    use rand::{Rng, rng};
377    use std::sync::Arc;
378    use std::sync::atomic::{AtomicBool, Ordering};
379
380    #[fuchsia::test]
381    async fn test_odd_sized_buffer_source() {
382        let source = BufferSource::new(123);
383        let allocator = BufferAllocator::new(2, source);
384
385        // 123 == 64 + 32 + 16 + 8 + 2 + 1. (The last byte is unusable.)
386        let sizes = vec![64, 32, 16, 8, 2];
387        let mut bufs = vec![];
388        for size in sizes.iter() {
389            bufs.push(allocator.allocate_buffer(*size).await);
390        }
391        for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
392            assert_eq!(*expected_size, buf.len());
393        }
394        assert!(allocator.try_allocate_buffer(2).is_err());
395    }
396
397    #[fuchsia::test]
398    async fn test_allocate_buffer_read_write() {
399        let source = BufferSource::new(1024 * 1024);
400        let allocator = BufferAllocator::new(8192, source);
401
402        let mut buf = allocator.allocate_buffer(8192).await;
403        buf.as_mut_slice().fill(0xaa as u8);
404        let mut vec = vec![0 as u8; 8192];
405        vec.copy_from_slice(buf.as_slice());
406        assert_eq!(vec, vec![0xaa as u8; 8192]);
407    }
408
409    #[fuchsia::test]
410    async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
411        let source = BufferSource::new(1024 * 1024);
412        let allocator = BufferAllocator::new(8192, source);
413
414        let buf1 = allocator.allocate_buffer(8192).await;
415        let buf2 = allocator.allocate_buffer(8192).await;
416        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
417    }
418
419    #[fuchsia::test]
420    async fn test_allocate_many_buffers() {
421        let source = BufferSource::new(1024 * 1024);
422        let allocator = BufferAllocator::new(8192, source);
423
424        for _ in 0..10 {
425            let _ = allocator.allocate_buffer(8192).await;
426        }
427    }
428
429    #[fuchsia::test]
430    async fn test_allocate_small_buffers_dont_overlap() {
431        let source = BufferSource::new(1024 * 1024);
432        let allocator = BufferAllocator::new(8192, source);
433
434        let buf1 = allocator.allocate_buffer(1).await;
435        let buf2 = allocator.allocate_buffer(1).await;
436        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
437    }
438
439    #[fuchsia::test]
440    async fn test_allocate_large_buffer() {
441        let source = BufferSource::new(1024 * 1024);
442        let allocator = BufferAllocator::new(8192, source);
443
444        let mut buf = allocator.allocate_buffer(1024 * 1024).await;
445        assert_eq!(buf.len(), 1024 * 1024);
446        buf.as_mut_slice().fill(0xaa as u8);
447        let mut vec = vec![0 as u8; 1024 * 1024];
448        vec.copy_from_slice(buf.as_slice());
449        assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
450    }
451
452    #[fuchsia::test]
453    async fn test_allocate_large_buffer_after_smaller_buffers() {
454        let source = BufferSource::new(1024 * 1024);
455        let allocator = BufferAllocator::new(8192, source);
456
457        {
458            let mut buffers = vec![];
459            while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
460                buffers.push(buffer);
461            }
462        }
463        let buf = allocator.allocate_buffer(1024 * 1024).await;
464        assert_eq!(buf.len(), 1024 * 1024);
465    }
466
467    #[fuchsia::test]
468    async fn test_allocate_at_limits() {
469        let source = BufferSource::new(1024 * 1024);
470        let allocator = BufferAllocator::new(8192, source);
471
472        let mut buffers = vec![];
473        while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
474            buffers.push(buffer);
475        }
476        // Deallocate a single buffer, and reallocate a single one back.
477        buffers.pop();
478        let buf = allocator.allocate_buffer(8192).await;
479        assert_eq!(buf.len(), 8192);
480    }
481
482    #[fuchsia::test(threads = 10)]
483    async fn test_random_allocs_deallocs() {
484        let source = BufferSource::new(16 * 1024 * 1024);
485        let bs = 512;
486        let allocator = Arc::new(BufferAllocator::new(bs, source));
487
488        join_all((0..10).map(|_| {
489            let allocator = allocator.clone();
490            fasync::Task::spawn(async move {
491                let mut rng = rng();
492                enum Op {
493                    Alloc,
494                    Dealloc,
495                }
496                let ops = vec![Op::Alloc, Op::Dealloc];
497                let mut buffers = vec![];
498                for _ in 0..1000 {
499                    match ops.choose(&mut rng).unwrap() {
500                        Op::Alloc => {
501                            // Rather than a uniform distribution 1..64K, first pick an order and
502                            // then pick a size within that. For example, we might pick order 3,
503                            // which would give us 8 * 512..16 * 512 as our possible range.
504                            // This way we don't bias towards larger allocations too much.
505                            let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
506                            let size: usize = rng.random_range(
507                                bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
508                            );
509                            if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
510                                let val = rng.random::<u8>();
511                                buf.as_mut_slice().fill(val);
512                                for v in buf.as_slice() {
513                                    assert_eq!(v, &val);
514                                }
515                                buffers.push(buf);
516                            }
517                        }
518                        Op::Dealloc if !buffers.is_empty() => {
519                            let idx = rng.random_range(0..buffers.len());
520                            buffers.remove(idx);
521                        }
522                        _ => {}
523                    };
524                }
525            })
526        }))
527        .await;
528    }
529
530    #[fuchsia::test]
531    async fn test_buffer_refs() {
532        let source = BufferSource::new(1024 * 1024);
533        let allocator = BufferAllocator::new(512, source);
534
535        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
536        // bugs.
537        let _buf = allocator.allocate_buffer(512).await;
538        let mut buf = allocator.allocate_buffer(4096).await;
539        let base = buf.range().start;
540        {
541            let mut bref = buf.subslice_mut(1000..2000);
542            assert_eq!(bref.len(), 1000);
543            assert_eq!(bref.range(), base + 1000..base + 2000);
544            bref.as_mut_slice().fill(0xbb);
545            {
546                let mut bref2 = bref.reborrow().subslice_mut(0..100);
547                assert_eq!(bref2.len(), 100);
548                assert_eq!(bref2.range(), base + 1000..base + 1100);
549                bref2.as_mut_slice().fill(0xaa);
550            }
551            {
552                let mut bref2 = bref.reborrow().subslice_mut(900..1000);
553                assert_eq!(bref2.len(), 100);
554                assert_eq!(bref2.range(), base + 1900..base + 2000);
555                bref2.as_mut_slice().fill(0xcc);
556            }
557            assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
558            assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
559
560            let bref = bref.subslice_mut(900..);
561            assert_eq!(bref.len(), 100);
562            assert_eq!(bref.as_slice(), vec![0xcc; 100]);
563        }
564        {
565            let bref = buf.as_ref();
566            assert_eq!(bref.len(), 4096);
567            assert_eq!(bref.range(), base..base + 4096);
568            assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
569            {
570                let bref2 = bref.subslice(1000..2000);
571                assert_eq!(bref2.len(), 1000);
572                assert_eq!(bref2.range(), base + 1000..base + 2000);
573                assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
574                assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
575                assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
576            }
577
578            let bref = bref.subslice(2048..);
579            assert_eq!(bref.len(), 2048);
580            assert_eq!(bref.as_slice(), vec![0x00; 2048]);
581        }
582    }
583
584    #[fuchsia::test]
585    async fn test_buffer_split() {
586        let source = BufferSource::new(1024 * 1024);
587        let allocator = BufferAllocator::new(512, source);
588
589        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
590        // bugs.
591        let _buf = allocator.allocate_buffer(512).await;
592        let mut buf = allocator.allocate_buffer(4096).await;
593        let base = buf.range().start;
594        {
595            let bref = buf.as_mut();
596            let (mut s1, mut s2) = bref.split_at_mut(2048);
597            assert_eq!(s1.len(), 2048);
598            assert_eq!(s1.range(), base..base + 2048);
599            s1.as_mut_slice().fill(0xaa);
600            assert_eq!(s2.len(), 2048);
601            assert_eq!(s2.range(), base + 2048..base + 4096);
602            s2.as_mut_slice().fill(0xbb);
603        }
604        {
605            let bref = buf.as_ref();
606            let (s1, s2) = bref.split_at(1);
607            let (s2, s3) = s2.split_at(2047);
608            let (s3, s4) = s3.split_at(0);
609            assert_eq!(s1.len(), 1);
610            assert_eq!(s1.range(), base..base + 1);
611            assert_eq!(s2.len(), 2047);
612            assert_eq!(s2.range(), base + 1..base + 2048);
613            assert_eq!(s3.len(), 0);
614            assert_eq!(s3.range(), base + 2048..base + 2048);
615            assert_eq!(s4.len(), 2048);
616            assert_eq!(s4.range(), base + 2048..base + 4096);
617            assert_eq!(s1.as_slice(), vec![0xaa; 1]);
618            assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
619            assert_eq!(s3.as_slice(), &[] as &[u8]);
620            assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
621        }
622    }
623
624    #[fuchsia::test]
625    async fn test_blocking_allocation() {
626        let source = BufferSource::new(1024 * 1024);
627        let allocator = Arc::new(BufferAllocator::new(512, source));
628
629        let buf1 = allocator.allocate_buffer(512 * 1024).await;
630        let buf2 = allocator.allocate_buffer(512 * 1024).await;
631        let bufs_dropped = Arc::new(AtomicBool::new(false));
632
633        // buf3_fut should block until both buf1 and buf2 are done.
634        let allocator_clone = allocator.clone();
635        let bufs_dropped_clone = bufs_dropped.clone();
636        let buf3_fut = async move {
637            allocator_clone.allocate_buffer(1024 * 1024).await;
638            assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
639        };
640        pin_mut!(buf3_fut);
641
642        // Each of buf_futs should block until buf3_fut is done, and they should proceed in order.
643        let mut buf_futs = vec![];
644        for _ in 0..16 {
645            let allocator_clone = allocator.clone();
646            let bufs_dropped_clone = bufs_dropped.clone();
647            let fut = async move {
648                allocator_clone.allocate_buffer(64 * 1024).await;
649                // We can't say with certainty that buf3 proceeded first, nor can we ensure these
650                // allocations proceed in order, but we can make sure that at least buf1/buf2 were
651                // done (since they exhausted the pool).
652                assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
653            };
654            buf_futs.push(fut);
655        }
656
657        futures::join!(buf3_fut, join_all(buf_futs), async move {
658            std::mem::drop(buf1);
659            std::mem::drop(buf2);
660            bufs_dropped.store(true, Ordering::Relaxed);
661        });
662    }
663
664    #[fuchsia::test]
665    async fn test_clean_entire_transfer_buffer() {
666        const BUFFER_SIZE: usize = 4096;
667        let source = BufferSource::new(BUFFER_SIZE);
668        let allocator = Arc::new(BufferAllocator::new(512, source));
669
670        let mut buf = allocator.allocate_buffer(BUFFER_SIZE).await;
671        buf.as_mut_slice().fill(0xaa);
672        std::mem::drop(buf);
673
674        allocator.clean_transfer_buffer();
675        let buf = allocator.allocate_buffer(BUFFER_SIZE).await;
676        assert_eq!(buf.as_slice(), vec![0; BUFFER_SIZE]);
677    }
678
679    #[fuchsia::test]
680    async fn test_clean_transfer_buffer_around_allocation() {
681        let source = BufferSource::new(4096);
682        let allocator = Arc::new(BufferAllocator::new(512, source));
683
684        let mut buf1 = allocator.allocate_buffer(1024).await;
685        buf1.as_mut_slice().fill(0xaa);
686        let mut buf2 = allocator.allocate_buffer(1024).await;
687        buf2.as_mut_slice().fill(0xbb);
688        assert_eq!(buf2.range().start, 1024);
689        let mut buf3 = allocator.allocate_buffer(2048).await;
690        buf3.as_mut_slice().fill(0xcc);
691        std::mem::drop(buf1);
692        std::mem::drop(buf3);
693
694        allocator.clean_transfer_buffer();
695
696        let buf1 = allocator.allocate_buffer(1024).await;
697        assert_eq!(buf1.as_slice(), vec![0; 1024]);
698
699        assert_eq!(buf2.as_slice(), vec![0xbb; 1024]);
700
701        let buf3 = allocator.allocate_buffer(2048).await;
702        assert_eq!(buf3.as_slice(), vec![0; 2048]);
703    }
704}