storage_device/
buffer_allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16    use fuchsia_runtime::vmar_root_self;
17    use std::ops::Range;
18    use zx::{self as zx, AsHandleRef};
19
20    /// A buffer source backed by a VMO.
21    #[derive(Debug)]
22    pub struct BufferSource {
23        base: *mut u8,
24        size: usize,
25        vmo: zx::Vmo,
26    }
27
28    // SAFETY: This is required for the *mut u8 which is just the base address of the VMO mapping
29    // and doesn't stop us making BufferSource Send and Sync.
30    unsafe impl Send for BufferSource {}
31    unsafe impl Sync for BufferSource {}
32
33    impl BufferSource {
34        pub fn new(size: usize) -> Self {
35            let vmo = zx::Vmo::create(size as u64).unwrap();
36            let name = zx::Name::new("transfer-buf").unwrap();
37            vmo.set_name(&name).unwrap();
38            let flags = zx::VmarFlags::PERM_READ
39                | zx::VmarFlags::PERM_WRITE
40                | zx::VmarFlags::MAP_RANGE
41                | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42            let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43            Self { base, size, vmo }
44        }
45
46        pub fn size(&self) -> usize {
47            self.size
48        }
49
50        pub fn vmo(&self) -> &zx::Vmo {
51            &self.vmo
52        }
53
54        #[allow(clippy::mut_from_ref)]
55        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
56            assert!(range.start < self.size && range.end <= self.size);
57            unsafe {
58                std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
59            }
60        }
61
62        /// Commits the range in memory to avoid future page faults.
63        pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
64            self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
65        }
66    }
67
68    impl Drop for BufferSource {
69        fn drop(&mut self) {
70            // SAFETY: This balances the `map` in `new` above.
71            unsafe {
72                let _ = vmar_root_self().unmap(self.base as usize, self.size);
73            }
74        }
75    }
76}
77
78#[cfg(not(target_os = "fuchsia"))]
79mod buffer_source {
80    use std::cell::UnsafeCell;
81    use std::ops::Range;
82    use std::pin::Pin;
83
84    /// A basic heap-backed buffer source.
85    #[derive(Debug)]
86    pub struct BufferSource {
87        // We use an UnsafeCell here because we need interior mutability of the buffer (to hand out
88        // mutable slices to it in |buffer()|), but don't want to pay the cost of wrapping the
89        // buffer in a Mutex. We must guarantee that the Buffer objects we hand out don't overlap,
90        // but that is already a requirement for correctness.
91        data: UnsafeCell<Pin<Vec<u8>>>,
92    }
93
94    // Safe because none of the fields in BufferSource are modified, except the contents of |data|,
95    // but that is managed by the BufferAllocator.
96    unsafe impl Sync for BufferSource {}
97
98    impl BufferSource {
99        pub fn new(size: usize) -> Self {
100            Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
101        }
102
103        pub fn size(&self) -> usize {
104            // Safe because the reference goes out of scope as soon as we use it.
105            unsafe { (&*self.data.get()).len() }
106        }
107
108        #[allow(clippy::mut_from_ref)]
109        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
110            assert!(range.start < self.size() && range.end <= self.size());
111            unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
112        }
113    }
114}
115
116pub use buffer_source::BufferSource;
117
118// Stores a list of offsets into a BufferSource. The size of the free ranges is determined by which
119// FreeList we are looking at.
120// FreeLists are sorted.
121type FreeList = Vec<usize>;
122
123#[derive(Debug)]
124struct Inner {
125    // The index corresponds to the order of free memory blocks in the free list.
126    free_lists: Vec<FreeList>,
127    // Maps offsets to allocated length (the actual length, not the size requested by the client).
128    allocation_map: BTreeMap<usize, usize>,
129}
130
131/// BufferAllocator creates Buffer objects to be used for block device I/O requests.
132///
133/// This is implemented through a simple buddy allocation scheme.
134#[derive(Debug)]
135pub struct BufferAllocator {
136    block_size: usize,
137    source: BufferSource,
138    inner: Mutex<Inner>,
139    event: Event,
140}
141
142// Returns the smallest order which is at least |size| bytes.
143fn order(size: usize, block_size: usize) -> usize {
144    if size <= block_size {
145        return 0;
146    }
147    let nblocks = round_up(size, block_size) / block_size;
148    nblocks.next_power_of_two().trailing_zeros() as usize
149}
150
151// Returns the largest order which is no more than |size| bytes.
152fn order_fit(size: usize, block_size: usize) -> usize {
153    assert!(size >= block_size);
154    let nblocks = round_up(size, block_size) / block_size;
155    if nblocks.is_power_of_two() {
156        nblocks.trailing_zeros() as usize
157    } else {
158        nblocks.next_power_of_two().trailing_zeros() as usize - 1
159    }
160}
161
162fn size_for_order(order: usize, block_size: usize) -> usize {
163    block_size * (1 << (order as u32))
164}
165
166fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
167    let size = round_down(size, block_size);
168    assert!(block_size <= size);
169    assert!(block_size.is_power_of_two());
170    let max_order = order_fit(size, block_size);
171    let mut free_lists = Vec::new();
172    for _ in 0..max_order + 1 {
173        free_lists.push(FreeList::new())
174    }
175    let mut offset = 0;
176    while offset < size {
177        let order = order_fit(size - offset, block_size);
178        let size = size_for_order(order, block_size);
179        free_lists[order].push(offset);
180        offset += size;
181    }
182    free_lists
183}
184
185/// A future which will resolve to an allocated [`Buffer`].
186pub struct BufferFuture<'a> {
187    allocator: &'a BufferAllocator,
188    size: usize,
189    listener: Option<EventListener>,
190}
191
192impl<'a> Future for BufferFuture<'a> {
193    type Output = Buffer<'a>;
194
195    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
196        if let Some(listener) = self.listener.as_mut() {
197            futures::ready!(listener.poll_unpin(context));
198        }
199        // Loop because we need to deal with the case where `listener` is ready immediately upon
200        // creation, in which case we ought to retry the allocation.
201        loop {
202            match self.allocator.try_allocate_buffer(self.size) {
203                Ok(buffer) => return Poll::Ready(buffer),
204                Err(mut listener) => {
205                    if listener.poll_unpin(context).is_pending() {
206                        self.listener = Some(listener);
207                        return Poll::Pending;
208                    }
209                }
210            }
211        }
212    }
213}
214
215impl BufferAllocator {
216    pub fn new(block_size: usize, source: BufferSource) -> Self {
217        let free_lists = initial_free_lists(source.size(), block_size);
218        Self {
219            block_size,
220            source,
221            inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
222            event: Event::new(),
223        }
224    }
225
226    pub fn block_size(&self) -> usize {
227        self.block_size
228    }
229
230    pub fn buffer_source(&self) -> &BufferSource {
231        &self.source
232    }
233
234    /// Takes the buffer source from the allocator and consumes the allocator.
235    pub fn take_buffer_source(self) -> BufferSource {
236        self.source
237    }
238
239    /// Allocates a Buffer with capacity for |size| bytes. Panics if the allocation exceeds the pool
240    /// size.  Blocks until there are enough bytes available to satisfy the request.
241    ///
242    /// The allocated buffer will be block-aligned and the padding up to block alignment can also
243    /// be used by the buffer.
244    ///
245    /// Allocation is O(lg(N) + M), where N = size and M = number of allocations.
246    pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
247        BufferFuture { allocator: self, size, listener: None }
248    }
249
250    /// Like |allocate_buffer|, but returns an EventListener if the allocation cannot be satisfied.
251    /// The listener will signal when the caller should try again.
252    pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
253        if size > self.source.size() {
254            panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
255        }
256        let mut inner = self.inner.lock();
257        let requested_order = order(size, self.block_size());
258        assert!(requested_order < inner.free_lists.len());
259        // Pick the smallest possible order with a free entry.
260        let mut order = {
261            let mut idx = requested_order;
262            loop {
263                if idx >= inner.free_lists.len() {
264                    return Err(self.event.listen());
265                }
266                if !inner.free_lists[idx].is_empty() {
267                    break idx;
268                }
269                idx += 1;
270            }
271        };
272
273        // Split the free region until it's the right size.
274        let offset = inner.free_lists[order].pop().unwrap();
275        while order > requested_order {
276            order -= 1;
277            assert!(inner.free_lists[order].is_empty());
278            inner.free_lists[order].push(offset + self.size_for_order(order));
279        }
280
281        inner.allocation_map.insert(offset, self.size_for_order(order));
282        let range = offset..offset + size;
283        log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
284
285        // Safety is ensured by the allocator not double-allocating any regions.
286        Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
287    }
288
289    /// Deallocation is O(lg(N) + M), where N = size and M = number of allocations.
290    #[doc(hidden)]
291    pub(super) fn free_buffer(&self, range: Range<usize>) {
292        let mut inner = self.inner.lock();
293        let mut offset = range.start;
294        let size = inner
295            .allocation_map
296            .remove(&offset)
297            .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
298        assert!(range.end - range.start <= size);
299        log::debug!(range:?, bytes_used = size; "Freeing");
300
301        // Merge as many free slots as we can.
302        let mut order = order(size, self.block_size());
303        while order < inner.free_lists.len() - 1 {
304            let buddy = self.find_buddy(offset, order);
305            let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
306                idx
307            } else {
308                break;
309            };
310            inner.free_lists[order].remove(idx);
311            offset = std::cmp::min(offset, buddy);
312            order += 1;
313        }
314
315        let idx = inner.free_lists[order]
316            .binary_search(&offset)
317            .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
318        inner.free_lists[order].insert(idx, offset);
319
320        // Notify all stuck tasks.  This might be inefficient, but it's simple and correct.
321        self.event.notify(usize::MAX);
322    }
323
324    fn size_for_order(&self, order: usize) -> usize {
325        size_for_order(order, self.block_size)
326    }
327
328    fn find_buddy(&self, offset: usize, order: usize) -> usize {
329        offset ^ self.size_for_order(order)
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
336    use fuchsia_async as fasync;
337    use futures::future::join_all;
338    use futures::pin_mut;
339    use rand::seq::IndexedRandom;
340    use rand::{Rng, rng};
341    use std::sync::Arc;
342    use std::sync::atomic::{AtomicBool, Ordering};
343
344    #[fuchsia::test]
345    async fn test_odd_sized_buffer_source() {
346        let source = BufferSource::new(123);
347        let allocator = BufferAllocator::new(2, source);
348
349        // 123 == 64 + 32 + 16 + 8 + 2 + 1. (The last byte is unusable.)
350        let sizes = vec![64, 32, 16, 8, 2];
351        let mut bufs = vec![];
352        for size in sizes.iter() {
353            bufs.push(allocator.allocate_buffer(*size).await);
354        }
355        for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
356            assert_eq!(*expected_size, buf.len());
357        }
358        assert!(allocator.try_allocate_buffer(2).is_err());
359    }
360
361    #[fuchsia::test]
362    async fn test_allocate_buffer_read_write() {
363        let source = BufferSource::new(1024 * 1024);
364        let allocator = BufferAllocator::new(8192, source);
365
366        let mut buf = allocator.allocate_buffer(8192).await;
367        buf.as_mut_slice().fill(0xaa as u8);
368        let mut vec = vec![0 as u8; 8192];
369        vec.copy_from_slice(buf.as_slice());
370        assert_eq!(vec, vec![0xaa as u8; 8192]);
371    }
372
373    #[fuchsia::test]
374    async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
375        let source = BufferSource::new(1024 * 1024);
376        let allocator = BufferAllocator::new(8192, source);
377
378        let buf1 = allocator.allocate_buffer(8192).await;
379        let buf2 = allocator.allocate_buffer(8192).await;
380        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
381    }
382
383    #[fuchsia::test]
384    async fn test_allocate_many_buffers() {
385        let source = BufferSource::new(1024 * 1024);
386        let allocator = BufferAllocator::new(8192, source);
387
388        for _ in 0..10 {
389            let _ = allocator.allocate_buffer(8192).await;
390        }
391    }
392
393    #[fuchsia::test]
394    async fn test_allocate_small_buffers_dont_overlap() {
395        let source = BufferSource::new(1024 * 1024);
396        let allocator = BufferAllocator::new(8192, source);
397
398        let buf1 = allocator.allocate_buffer(1).await;
399        let buf2 = allocator.allocate_buffer(1).await;
400        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
401    }
402
403    #[fuchsia::test]
404    async fn test_allocate_large_buffer() {
405        let source = BufferSource::new(1024 * 1024);
406        let allocator = BufferAllocator::new(8192, source);
407
408        let mut buf = allocator.allocate_buffer(1024 * 1024).await;
409        assert_eq!(buf.len(), 1024 * 1024);
410        buf.as_mut_slice().fill(0xaa as u8);
411        let mut vec = vec![0 as u8; 1024 * 1024];
412        vec.copy_from_slice(buf.as_slice());
413        assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
414    }
415
416    #[fuchsia::test]
417    async fn test_allocate_large_buffer_after_smaller_buffers() {
418        let source = BufferSource::new(1024 * 1024);
419        let allocator = BufferAllocator::new(8192, source);
420
421        {
422            let mut buffers = vec![];
423            while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
424                buffers.push(buffer);
425            }
426        }
427        let buf = allocator.allocate_buffer(1024 * 1024).await;
428        assert_eq!(buf.len(), 1024 * 1024);
429    }
430
431    #[fuchsia::test]
432    async fn test_allocate_at_limits() {
433        let source = BufferSource::new(1024 * 1024);
434        let allocator = BufferAllocator::new(8192, source);
435
436        let mut buffers = vec![];
437        while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
438            buffers.push(buffer);
439        }
440        // Deallocate a single buffer, and reallocate a single one back.
441        buffers.pop();
442        let buf = allocator.allocate_buffer(8192).await;
443        assert_eq!(buf.len(), 8192);
444    }
445
446    #[fuchsia::test(threads = 10)]
447    async fn test_random_allocs_deallocs() {
448        let source = BufferSource::new(16 * 1024 * 1024);
449        let bs = 512;
450        let allocator = Arc::new(BufferAllocator::new(bs, source));
451
452        join_all((0..10).map(|_| {
453            let allocator = allocator.clone();
454            fasync::Task::spawn(async move {
455                let mut rng = rng();
456                enum Op {
457                    Alloc,
458                    Dealloc,
459                }
460                let ops = vec![Op::Alloc, Op::Dealloc];
461                let mut buffers = vec![];
462                for _ in 0..1000 {
463                    match ops.choose(&mut rng).unwrap() {
464                        Op::Alloc => {
465                            // Rather than a uniform distribution 1..64K, first pick an order and
466                            // then pick a size within that. For example, we might pick order 3,
467                            // which would give us 8 * 512..16 * 512 as our possible range.
468                            // This way we don't bias towards larger allocations too much.
469                            let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
470                            let size: usize = rng.random_range(
471                                bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
472                            );
473                            if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
474                                let val = rng.random::<u8>();
475                                buf.as_mut_slice().fill(val);
476                                for v in buf.as_slice() {
477                                    assert_eq!(v, &val);
478                                }
479                                buffers.push(buf);
480                            }
481                        }
482                        Op::Dealloc if !buffers.is_empty() => {
483                            let idx = rng.random_range(0..buffers.len());
484                            buffers.remove(idx);
485                        }
486                        _ => {}
487                    };
488                }
489            })
490        }))
491        .await;
492    }
493
494    #[fuchsia::test]
495    async fn test_buffer_refs() {
496        let source = BufferSource::new(1024 * 1024);
497        let allocator = BufferAllocator::new(512, source);
498
499        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
500        // bugs.
501        let _buf = allocator.allocate_buffer(512).await;
502        let mut buf = allocator.allocate_buffer(4096).await;
503        let base = buf.range().start;
504        {
505            let mut bref = buf.subslice_mut(1000..2000);
506            assert_eq!(bref.len(), 1000);
507            assert_eq!(bref.range(), base + 1000..base + 2000);
508            bref.as_mut_slice().fill(0xbb);
509            {
510                let mut bref2 = bref.reborrow().subslice_mut(0..100);
511                assert_eq!(bref2.len(), 100);
512                assert_eq!(bref2.range(), base + 1000..base + 1100);
513                bref2.as_mut_slice().fill(0xaa);
514            }
515            {
516                let mut bref2 = bref.reborrow().subslice_mut(900..1000);
517                assert_eq!(bref2.len(), 100);
518                assert_eq!(bref2.range(), base + 1900..base + 2000);
519                bref2.as_mut_slice().fill(0xcc);
520            }
521            assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
522            assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
523
524            let bref = bref.subslice_mut(900..);
525            assert_eq!(bref.len(), 100);
526            assert_eq!(bref.as_slice(), vec![0xcc; 100]);
527        }
528        {
529            let bref = buf.as_ref();
530            assert_eq!(bref.len(), 4096);
531            assert_eq!(bref.range(), base..base + 4096);
532            assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
533            {
534                let bref2 = bref.subslice(1000..2000);
535                assert_eq!(bref2.len(), 1000);
536                assert_eq!(bref2.range(), base + 1000..base + 2000);
537                assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
538                assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
539                assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
540            }
541
542            let bref = bref.subslice(2048..);
543            assert_eq!(bref.len(), 2048);
544            assert_eq!(bref.as_slice(), vec![0x00; 2048]);
545        }
546    }
547
548    #[fuchsia::test]
549    async fn test_buffer_split() {
550        let source = BufferSource::new(1024 * 1024);
551        let allocator = BufferAllocator::new(512, source);
552
553        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
554        // bugs.
555        let _buf = allocator.allocate_buffer(512).await;
556        let mut buf = allocator.allocate_buffer(4096).await;
557        let base = buf.range().start;
558        {
559            let bref = buf.as_mut();
560            let (mut s1, mut s2) = bref.split_at_mut(2048);
561            assert_eq!(s1.len(), 2048);
562            assert_eq!(s1.range(), base..base + 2048);
563            s1.as_mut_slice().fill(0xaa);
564            assert_eq!(s2.len(), 2048);
565            assert_eq!(s2.range(), base + 2048..base + 4096);
566            s2.as_mut_slice().fill(0xbb);
567        }
568        {
569            let bref = buf.as_ref();
570            let (s1, s2) = bref.split_at(1);
571            let (s2, s3) = s2.split_at(2047);
572            let (s3, s4) = s3.split_at(0);
573            assert_eq!(s1.len(), 1);
574            assert_eq!(s1.range(), base..base + 1);
575            assert_eq!(s2.len(), 2047);
576            assert_eq!(s2.range(), base + 1..base + 2048);
577            assert_eq!(s3.len(), 0);
578            assert_eq!(s3.range(), base + 2048..base + 2048);
579            assert_eq!(s4.len(), 2048);
580            assert_eq!(s4.range(), base + 2048..base + 4096);
581            assert_eq!(s1.as_slice(), vec![0xaa; 1]);
582            assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
583            assert_eq!(s3.as_slice(), &[] as &[u8]);
584            assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
585        }
586    }
587
588    #[fuchsia::test]
589    async fn test_blocking_allocation() {
590        let source = BufferSource::new(1024 * 1024);
591        let allocator = Arc::new(BufferAllocator::new(512, source));
592
593        let buf1 = allocator.allocate_buffer(512 * 1024).await;
594        let buf2 = allocator.allocate_buffer(512 * 1024).await;
595        let bufs_dropped = Arc::new(AtomicBool::new(false));
596
597        // buf3_fut should block until both buf1 and buf2 are done.
598        let allocator_clone = allocator.clone();
599        let bufs_dropped_clone = bufs_dropped.clone();
600        let buf3_fut = async move {
601            allocator_clone.allocate_buffer(1024 * 1024).await;
602            assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
603        };
604        pin_mut!(buf3_fut);
605
606        // Each of buf_futs should block until buf3_fut is done, and they should proceed in order.
607        let mut buf_futs = vec![];
608        for _ in 0..16 {
609            let allocator_clone = allocator.clone();
610            let bufs_dropped_clone = bufs_dropped.clone();
611            let fut = async move {
612                allocator_clone.allocate_buffer(64 * 1024).await;
613                // We can't say with certainty that buf3 proceeded first, nor can we ensure these
614                // allocations proceed in order, but we can make sure that at least buf1/buf2 were
615                // done (since they exhausted the pool).
616                assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
617            };
618            buf_futs.push(fut);
619        }
620
621        futures::join!(buf3_fut, join_all(buf_futs), async move {
622            std::mem::drop(buf1);
623            std::mem::drop(buf2);
624            bufs_dropped.store(true, Ordering::Relaxed);
625        });
626    }
627}