storage_device/
buffer_allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16    use fuchsia_runtime::vmar_root_self;
17    use std::ops::Range;
18    use std::sync::Arc;
19    use zx::{self as zx, AsHandleRef};
20
21    /// A buffer source backed by a VMO.
22    #[derive(Debug)]
23    pub struct BufferSource {
24        base: *mut u8,
25        size: usize,
26        vmo: Arc<zx::Vmo>,
27    }
28
29    // SAFETY: This is required for the *mut u8 which is just the base address of the VMO mapping
30    // and doesn't stop us making BufferSource Send and Sync.
31    unsafe impl Send for BufferSource {}
32    unsafe impl Sync for BufferSource {}
33
34    impl BufferSource {
35        pub fn new(size: usize) -> Self {
36            let vmo = Arc::new(zx::Vmo::create(size as u64).unwrap());
37            let name = zx::Name::new("transfer-buf").unwrap();
38            vmo.set_name(&name).unwrap();
39            let flags = zx::VmarFlags::PERM_READ
40                | zx::VmarFlags::PERM_WRITE
41                | zx::VmarFlags::MAP_RANGE
42                | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
43            let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
44            Self { base, size, vmo }
45        }
46
47        pub fn slice(&self) -> *mut [u8] {
48            std::ptr::slice_from_raw_parts_mut(self.base, self.size)
49        }
50
51        pub fn size(&self) -> usize {
52            self.size
53        }
54
55        pub fn vmo(&self) -> &Arc<zx::Vmo> {
56            &self.vmo
57        }
58
59        #[allow(clippy::mut_from_ref)]
60        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
61            assert!(range.start < self.size && range.end <= self.size);
62            unsafe {
63                std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
64            }
65        }
66
67        /// Commits the range in memory to avoid future page faults.
68        pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
69            self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
70        }
71    }
72
73    impl Drop for BufferSource {
74        fn drop(&mut self) {
75            // SAFETY: This balances the `map` in `new` above.
76            unsafe {
77                let _ = vmar_root_self().unmap(self.base as usize, self.size);
78            }
79        }
80    }
81}
82
83#[cfg(not(target_os = "fuchsia"))]
84mod buffer_source {
85    use std::cell::UnsafeCell;
86    use std::ops::Range;
87    use std::pin::Pin;
88
89    /// A basic heap-backed buffer source.
90    #[derive(Debug)]
91    pub struct BufferSource {
92        // We use an UnsafeCell here because we need interior mutability of the buffer (to hand out
93        // mutable slices to it in |buffer()|), but don't want to pay the cost of wrapping the
94        // buffer in a Mutex. We must guarantee that the Buffer objects we hand out don't overlap,
95        // but that is already a requirement for correctness.
96        data: UnsafeCell<Pin<Vec<u8>>>,
97    }
98
99    // Safe because none of the fields in BufferSource are modified, except the contents of |data|,
100    // but that is managed by the BufferAllocator.
101    unsafe impl Sync for BufferSource {}
102
103    impl BufferSource {
104        pub fn new(size: usize) -> Self {
105            Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
106        }
107
108        pub fn size(&self) -> usize {
109            // Safe because the reference goes out of scope as soon as we use it.
110            unsafe { (&*self.data.get()).len() }
111        }
112
113        #[allow(clippy::mut_from_ref)]
114        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
115            assert!(range.start < self.size() && range.end <= self.size());
116            unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
117        }
118    }
119}
120
121pub use buffer_source::BufferSource;
122
123// Stores a list of offsets into a BufferSource. The size of the free ranges is determined by which
124// FreeList we are looking at.
125// FreeLists are sorted.
126type FreeList = Vec<usize>;
127
128#[derive(Debug)]
129struct Inner {
130    // The index corresponds to the order of free memory blocks in the free list.
131    free_lists: Vec<FreeList>,
132    // Maps offsets to allocated length (the actual length, not the size requested by the client).
133    allocation_map: BTreeMap<usize, usize>,
134}
135
136/// BufferAllocator creates Buffer objects to be used for block device I/O requests.
137///
138/// This is implemented through a simple buddy allocation scheme.
139#[derive(Debug)]
140pub struct BufferAllocator {
141    block_size: usize,
142    source: BufferSource,
143    inner: Mutex<Inner>,
144    event: Event,
145}
146
147// Returns the smallest order which is at least |size| bytes.
148fn order(size: usize, block_size: usize) -> usize {
149    if size <= block_size {
150        return 0;
151    }
152    let nblocks = round_up(size, block_size) / block_size;
153    nblocks.next_power_of_two().trailing_zeros() as usize
154}
155
156// Returns the largest order which is no more than |size| bytes.
157fn order_fit(size: usize, block_size: usize) -> usize {
158    assert!(size >= block_size);
159    let nblocks = round_up(size, block_size) / block_size;
160    if nblocks.is_power_of_two() {
161        nblocks.trailing_zeros() as usize
162    } else {
163        nblocks.next_power_of_two().trailing_zeros() as usize - 1
164    }
165}
166
167fn size_for_order(order: usize, block_size: usize) -> usize {
168    block_size * (1 << (order as u32))
169}
170
171fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
172    let size = round_down(size, block_size);
173    assert!(block_size <= size);
174    assert!(block_size.is_power_of_two());
175    let max_order = order_fit(size, block_size);
176    let mut free_lists = Vec::new();
177    for _ in 0..max_order + 1 {
178        free_lists.push(FreeList::new())
179    }
180    let mut offset = 0;
181    while offset < size {
182        let order = order_fit(size - offset, block_size);
183        let size = size_for_order(order, block_size);
184        free_lists[order].push(offset);
185        offset += size;
186    }
187    free_lists
188}
189
190/// A future which will resolve to an allocated [`Buffer`].
191pub struct BufferFuture<'a> {
192    allocator: &'a BufferAllocator,
193    size: usize,
194    listener: Option<EventListener>,
195}
196
197impl<'a> Future for BufferFuture<'a> {
198    type Output = Buffer<'a>;
199
200    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
201        if let Some(listener) = self.listener.as_mut() {
202            futures::ready!(listener.poll_unpin(context));
203        }
204        // Loop because we need to deal with the case where `listener` is ready immediately upon
205        // creation, in which case we ought to retry the allocation.
206        loop {
207            match self.allocator.try_allocate_buffer(self.size) {
208                Ok(buffer) => return Poll::Ready(buffer),
209                Err(mut listener) => {
210                    if listener.poll_unpin(context).is_pending() {
211                        self.listener = Some(listener);
212                        return Poll::Pending;
213                    }
214                }
215            }
216        }
217    }
218}
219
220impl BufferAllocator {
221    pub fn new(block_size: usize, source: BufferSource) -> Self {
222        let free_lists = initial_free_lists(source.size(), block_size);
223        Self {
224            block_size,
225            source,
226            inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
227            event: Event::new(),
228        }
229    }
230
231    pub fn block_size(&self) -> usize {
232        self.block_size
233    }
234
235    pub fn buffer_source(&self) -> &BufferSource {
236        &self.source
237    }
238
239    /// Takes the buffer source from the allocator and consumes the allocator.
240    pub fn take_buffer_source(self) -> BufferSource {
241        self.source
242    }
243
244    /// Allocates a Buffer with capacity for |size| bytes. Panics if the allocation exceeds the pool
245    /// size.  Blocks until there are enough bytes available to satisfy the request.
246    ///
247    /// The allocated buffer will be block-aligned and the padding up to block alignment can also
248    /// be used by the buffer.
249    ///
250    /// Allocation is O(lg(N) + M), where N = size and M = number of allocations.
251    pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
252        BufferFuture { allocator: self, size, listener: None }
253    }
254
255    /// Like |allocate_buffer|, but returns an EventListener if the allocation cannot be satisfied.
256    /// The listener will signal when the caller should try again.
257    pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
258        if size > self.source.size() {
259            panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
260        }
261        let mut inner = self.inner.lock();
262        let requested_order = order(size, self.block_size());
263        assert!(requested_order < inner.free_lists.len());
264        // Pick the smallest possible order with a free entry.
265        let mut order = {
266            let mut idx = requested_order;
267            loop {
268                if idx >= inner.free_lists.len() {
269                    return Err(self.event.listen());
270                }
271                if !inner.free_lists[idx].is_empty() {
272                    break idx;
273                }
274                idx += 1;
275            }
276        };
277
278        // Split the free region until it's the right size.
279        let offset = inner.free_lists[order].pop().unwrap();
280        while order > requested_order {
281            order -= 1;
282            assert!(inner.free_lists[order].is_empty());
283            inner.free_lists[order].push(offset + self.size_for_order(order));
284        }
285
286        inner.allocation_map.insert(offset, self.size_for_order(order));
287        let range = offset..offset + size;
288        log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
289
290        // Safety is ensured by the allocator not double-allocating any regions.
291        Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
292    }
293
294    /// Deallocation is O(lg(N) + M), where N = size and M = number of allocations.
295    #[doc(hidden)]
296    pub(super) fn free_buffer(&self, range: Range<usize>) {
297        let mut inner = self.inner.lock();
298        let mut offset = range.start;
299        let size = inner
300            .allocation_map
301            .remove(&offset)
302            .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
303        assert!(range.end - range.start <= size);
304        log::debug!(range:?, bytes_used = size; "Freeing");
305
306        // Merge as many free slots as we can.
307        let mut order = order(size, self.block_size());
308        while order < inner.free_lists.len() - 1 {
309            let buddy = self.find_buddy(offset, order);
310            let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
311                idx
312            } else {
313                break;
314            };
315            inner.free_lists[order].remove(idx);
316            offset = std::cmp::min(offset, buddy);
317            order += 1;
318        }
319
320        let idx = inner.free_lists[order]
321            .binary_search(&offset)
322            .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
323        inner.free_lists[order].insert(idx, offset);
324
325        // Notify all stuck tasks.  This might be inefficient, but it's simple and correct.
326        self.event.notify(usize::MAX);
327    }
328
329    fn size_for_order(&self, order: usize) -> usize {
330        size_for_order(order, self.block_size)
331    }
332
333    fn find_buddy(&self, offset: usize, order: usize) -> usize {
334        offset ^ self.size_for_order(order)
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
341    use fuchsia_async as fasync;
342    use futures::future::join_all;
343    use futures::pin_mut;
344    use rand::seq::IndexedRandom;
345    use rand::{Rng, rng};
346    use std::sync::Arc;
347    use std::sync::atomic::{AtomicBool, Ordering};
348
349    #[fuchsia::test]
350    async fn test_odd_sized_buffer_source() {
351        let source = BufferSource::new(123);
352        let allocator = BufferAllocator::new(2, source);
353
354        // 123 == 64 + 32 + 16 + 8 + 2 + 1. (The last byte is unusable.)
355        let sizes = vec![64, 32, 16, 8, 2];
356        let mut bufs = vec![];
357        for size in sizes.iter() {
358            bufs.push(allocator.allocate_buffer(*size).await);
359        }
360        for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
361            assert_eq!(*expected_size, buf.len());
362        }
363        assert!(allocator.try_allocate_buffer(2).is_err());
364    }
365
366    #[fuchsia::test]
367    async fn test_allocate_buffer_read_write() {
368        let source = BufferSource::new(1024 * 1024);
369        let allocator = BufferAllocator::new(8192, source);
370
371        let mut buf = allocator.allocate_buffer(8192).await;
372        buf.as_mut_slice().fill(0xaa as u8);
373        let mut vec = vec![0 as u8; 8192];
374        vec.copy_from_slice(buf.as_slice());
375        assert_eq!(vec, vec![0xaa as u8; 8192]);
376    }
377
378    #[fuchsia::test]
379    async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
380        let source = BufferSource::new(1024 * 1024);
381        let allocator = BufferAllocator::new(8192, source);
382
383        let buf1 = allocator.allocate_buffer(8192).await;
384        let buf2 = allocator.allocate_buffer(8192).await;
385        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
386    }
387
388    #[fuchsia::test]
389    async fn test_allocate_many_buffers() {
390        let source = BufferSource::new(1024 * 1024);
391        let allocator = BufferAllocator::new(8192, source);
392
393        for _ in 0..10 {
394            let _ = allocator.allocate_buffer(8192).await;
395        }
396    }
397
398    #[fuchsia::test]
399    async fn test_allocate_small_buffers_dont_overlap() {
400        let source = BufferSource::new(1024 * 1024);
401        let allocator = BufferAllocator::new(8192, source);
402
403        let buf1 = allocator.allocate_buffer(1).await;
404        let buf2 = allocator.allocate_buffer(1).await;
405        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
406    }
407
408    #[fuchsia::test]
409    async fn test_allocate_large_buffer() {
410        let source = BufferSource::new(1024 * 1024);
411        let allocator = BufferAllocator::new(8192, source);
412
413        let mut buf = allocator.allocate_buffer(1024 * 1024).await;
414        assert_eq!(buf.len(), 1024 * 1024);
415        buf.as_mut_slice().fill(0xaa as u8);
416        let mut vec = vec![0 as u8; 1024 * 1024];
417        vec.copy_from_slice(buf.as_slice());
418        assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
419    }
420
421    #[fuchsia::test]
422    async fn test_allocate_large_buffer_after_smaller_buffers() {
423        let source = BufferSource::new(1024 * 1024);
424        let allocator = BufferAllocator::new(8192, source);
425
426        {
427            let mut buffers = vec![];
428            while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
429                buffers.push(buffer);
430            }
431        }
432        let buf = allocator.allocate_buffer(1024 * 1024).await;
433        assert_eq!(buf.len(), 1024 * 1024);
434    }
435
436    #[fuchsia::test]
437    async fn test_allocate_at_limits() {
438        let source = BufferSource::new(1024 * 1024);
439        let allocator = BufferAllocator::new(8192, source);
440
441        let mut buffers = vec![];
442        while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
443            buffers.push(buffer);
444        }
445        // Deallocate a single buffer, and reallocate a single one back.
446        buffers.pop();
447        let buf = allocator.allocate_buffer(8192).await;
448        assert_eq!(buf.len(), 8192);
449    }
450
451    #[fuchsia::test(threads = 10)]
452    async fn test_random_allocs_deallocs() {
453        let source = BufferSource::new(16 * 1024 * 1024);
454        let bs = 512;
455        let allocator = Arc::new(BufferAllocator::new(bs, source));
456
457        join_all((0..10).map(|_| {
458            let allocator = allocator.clone();
459            fasync::Task::spawn(async move {
460                let mut rng = rng();
461                enum Op {
462                    Alloc,
463                    Dealloc,
464                }
465                let ops = vec![Op::Alloc, Op::Dealloc];
466                let mut buffers = vec![];
467                for _ in 0..1000 {
468                    match ops.choose(&mut rng).unwrap() {
469                        Op::Alloc => {
470                            // Rather than a uniform distribution 1..64K, first pick an order and
471                            // then pick a size within that. For example, we might pick order 3,
472                            // which would give us 8 * 512..16 * 512 as our possible range.
473                            // This way we don't bias towards larger allocations too much.
474                            let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
475                            let size: usize = rng.random_range(
476                                bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
477                            );
478                            if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
479                                let val = rng.random::<u8>();
480                                buf.as_mut_slice().fill(val);
481                                for v in buf.as_slice() {
482                                    assert_eq!(v, &val);
483                                }
484                                buffers.push(buf);
485                            }
486                        }
487                        Op::Dealloc if !buffers.is_empty() => {
488                            let idx = rng.random_range(0..buffers.len());
489                            buffers.remove(idx);
490                        }
491                        _ => {}
492                    };
493                }
494            })
495        }))
496        .await;
497    }
498
499    #[fuchsia::test]
500    async fn test_buffer_refs() {
501        let source = BufferSource::new(1024 * 1024);
502        let allocator = BufferAllocator::new(512, source);
503
504        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
505        // bugs.
506        let _buf = allocator.allocate_buffer(512).await;
507        let mut buf = allocator.allocate_buffer(4096).await;
508        let base = buf.range().start;
509        {
510            let mut bref = buf.subslice_mut(1000..2000);
511            assert_eq!(bref.len(), 1000);
512            assert_eq!(bref.range(), base + 1000..base + 2000);
513            bref.as_mut_slice().fill(0xbb);
514            {
515                let mut bref2 = bref.reborrow().subslice_mut(0..100);
516                assert_eq!(bref2.len(), 100);
517                assert_eq!(bref2.range(), base + 1000..base + 1100);
518                bref2.as_mut_slice().fill(0xaa);
519            }
520            {
521                let mut bref2 = bref.reborrow().subslice_mut(900..1000);
522                assert_eq!(bref2.len(), 100);
523                assert_eq!(bref2.range(), base + 1900..base + 2000);
524                bref2.as_mut_slice().fill(0xcc);
525            }
526            assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
527            assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
528
529            let bref = bref.subslice_mut(900..);
530            assert_eq!(bref.len(), 100);
531            assert_eq!(bref.as_slice(), vec![0xcc; 100]);
532        }
533        {
534            let bref = buf.as_ref();
535            assert_eq!(bref.len(), 4096);
536            assert_eq!(bref.range(), base..base + 4096);
537            assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
538            {
539                let bref2 = bref.subslice(1000..2000);
540                assert_eq!(bref2.len(), 1000);
541                assert_eq!(bref2.range(), base + 1000..base + 2000);
542                assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
543                assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
544                assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
545            }
546
547            let bref = bref.subslice(2048..);
548            assert_eq!(bref.len(), 2048);
549            assert_eq!(bref.as_slice(), vec![0x00; 2048]);
550        }
551    }
552
553    #[fuchsia::test]
554    async fn test_buffer_split() {
555        let source = BufferSource::new(1024 * 1024);
556        let allocator = BufferAllocator::new(512, source);
557
558        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
559        // bugs.
560        let _buf = allocator.allocate_buffer(512).await;
561        let mut buf = allocator.allocate_buffer(4096).await;
562        let base = buf.range().start;
563        {
564            let bref = buf.as_mut();
565            let (mut s1, mut s2) = bref.split_at_mut(2048);
566            assert_eq!(s1.len(), 2048);
567            assert_eq!(s1.range(), base..base + 2048);
568            s1.as_mut_slice().fill(0xaa);
569            assert_eq!(s2.len(), 2048);
570            assert_eq!(s2.range(), base + 2048..base + 4096);
571            s2.as_mut_slice().fill(0xbb);
572        }
573        {
574            let bref = buf.as_ref();
575            let (s1, s2) = bref.split_at(1);
576            let (s2, s3) = s2.split_at(2047);
577            let (s3, s4) = s3.split_at(0);
578            assert_eq!(s1.len(), 1);
579            assert_eq!(s1.range(), base..base + 1);
580            assert_eq!(s2.len(), 2047);
581            assert_eq!(s2.range(), base + 1..base + 2048);
582            assert_eq!(s3.len(), 0);
583            assert_eq!(s3.range(), base + 2048..base + 2048);
584            assert_eq!(s4.len(), 2048);
585            assert_eq!(s4.range(), base + 2048..base + 4096);
586            assert_eq!(s1.as_slice(), vec![0xaa; 1]);
587            assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
588            assert_eq!(s3.as_slice(), &[] as &[u8]);
589            assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
590        }
591    }
592
593    #[fuchsia::test]
594    async fn test_blocking_allocation() {
595        let source = BufferSource::new(1024 * 1024);
596        let allocator = Arc::new(BufferAllocator::new(512, source));
597
598        let buf1 = allocator.allocate_buffer(512 * 1024).await;
599        let buf2 = allocator.allocate_buffer(512 * 1024).await;
600        let bufs_dropped = Arc::new(AtomicBool::new(false));
601
602        // buf3_fut should block until both buf1 and buf2 are done.
603        let allocator_clone = allocator.clone();
604        let bufs_dropped_clone = bufs_dropped.clone();
605        let buf3_fut = async move {
606            allocator_clone.allocate_buffer(1024 * 1024).await;
607            assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
608        };
609        pin_mut!(buf3_fut);
610
611        // Each of buf_futs should block until buf3_fut is done, and they should proceed in order.
612        let mut buf_futs = vec![];
613        for _ in 0..16 {
614            let allocator_clone = allocator.clone();
615            let bufs_dropped_clone = bufs_dropped.clone();
616            let fut = async move {
617                allocator_clone.allocate_buffer(64 * 1024).await;
618                // We can't say with certainty that buf3 proceeded first, nor can we ensure these
619                // allocations proceed in order, but we can make sure that at least buf1/buf2 were
620                // done (since they exhausted the pool).
621                assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
622            };
623            buf_futs.push(fut);
624        }
625
626        futures::join!(buf3_fut, join_all(buf_futs), async move {
627            std::mem::drop(buf1);
628            std::mem::drop(buf2);
629            bufs_dropped.store(true, Ordering::Relaxed);
630        });
631    }
632}