storage_device/
buffer_allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{round_down, round_up, Buffer};
6use event_listener::{Event, EventListener};
7use futures::{Future, FutureExt as _};
8use std::collections::BTreeMap;
9use std::ops::Range;
10use std::pin::Pin;
11use std::sync::Mutex;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16    use fuchsia_runtime::vmar_root_self;
17    use std::ops::Range;
18    use zx::{self as zx, AsHandleRef};
19
20    /// A buffer source backed by a VMO.
21    #[derive(Debug)]
22    pub struct BufferSource {
23        base: *mut u8,
24        size: usize,
25        vmo: zx::Vmo,
26    }
27
28    // SAFETY: This is required for the *mut u8 which is just the base address of the VMO mapping
29    // and doesn't stop us making BufferSource Send and Sync.
30    unsafe impl Send for BufferSource {}
31    unsafe impl Sync for BufferSource {}
32
33    impl BufferSource {
34        pub fn new(size: usize) -> Self {
35            let vmo = zx::Vmo::create(size as u64).unwrap();
36            let name = zx::Name::new("transfer-buf").unwrap();
37            vmo.set_name(&name).unwrap();
38            let flags = zx::VmarFlags::PERM_READ
39                | zx::VmarFlags::PERM_WRITE
40                | zx::VmarFlags::MAP_RANGE
41                | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42            let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43            Self { base, size, vmo }
44        }
45
46        pub fn size(&self) -> usize {
47            self.size
48        }
49
50        pub fn vmo(&self) -> &zx::Vmo {
51            &self.vmo
52        }
53
54        #[allow(clippy::mut_from_ref)]
55        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
56            assert!(range.start < self.size && range.end <= self.size);
57            std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
58        }
59
60        /// Commits the range in memory to avoid future page faults.
61        pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
62            self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
63        }
64    }
65
66    impl Drop for BufferSource {
67        fn drop(&mut self) {
68            // SAFETY: This balances the `map` in `new` above.
69            unsafe {
70                let _ = vmar_root_self().unmap(self.base as usize, self.size);
71            }
72        }
73    }
74}
75
76#[cfg(not(target_os = "fuchsia"))]
77mod buffer_source {
78    use std::cell::UnsafeCell;
79    use std::ops::Range;
80    use std::pin::Pin;
81
82    /// A basic heap-backed buffer source.
83    #[derive(Debug)]
84    pub struct BufferSource {
85        // We use an UnsafeCell here because we need interior mutability of the buffer (to hand out
86        // mutable slices to it in |buffer()|), but don't want to pay the cost of wrapping the
87        // buffer in a Mutex. We must guarantee that the Buffer objects we hand out don't overlap,
88        // but that is already a requirement for correctness.
89        data: UnsafeCell<Pin<Vec<u8>>>,
90    }
91
92    // Safe because none of the fields in BufferSource are modified, except the contents of |data|,
93    // but that is managed by the BufferAllocator.
94    unsafe impl Sync for BufferSource {}
95
96    impl BufferSource {
97        pub fn new(size: usize) -> Self {
98            Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
99        }
100
101        pub fn size(&self) -> usize {
102            // Safe because the reference goes out of scope as soon as we use it.
103            unsafe { (&*self.data.get()).len() }
104        }
105
106        #[allow(clippy::mut_from_ref)]
107        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
108            assert!(range.start < self.size() && range.end <= self.size());
109            &mut (&mut *self.data.get())[range.start..range.end]
110        }
111    }
112}
113
114pub use buffer_source::BufferSource;
115
116// Stores a list of offsets into a BufferSource. The size of the free ranges is determined by which
117// FreeList we are looking at.
118// FreeLists are sorted.
119type FreeList = Vec<usize>;
120
121#[derive(Debug)]
122struct Inner {
123    // The index corresponds to the order of free memory blocks in the free list.
124    free_lists: Vec<FreeList>,
125    // Maps offsets to allocated length (the actual length, not the size requested by the client).
126    allocation_map: BTreeMap<usize, usize>,
127}
128
129/// BufferAllocator creates Buffer objects to be used for block device I/O requests.
130///
131/// This is implemented through a simple buddy allocation scheme.
132#[derive(Debug)]
133pub struct BufferAllocator {
134    block_size: usize,
135    source: BufferSource,
136    inner: Mutex<Inner>,
137    event: Event,
138}
139
140// Returns the smallest order which is at least |size| bytes.
141fn order(size: usize, block_size: usize) -> usize {
142    if size <= block_size {
143        return 0;
144    }
145    let nblocks = round_up(size, block_size) / block_size;
146    nblocks.next_power_of_two().trailing_zeros() as usize
147}
148
149// Returns the largest order which is no more than |size| bytes.
150fn order_fit(size: usize, block_size: usize) -> usize {
151    assert!(size >= block_size);
152    let nblocks = round_up(size, block_size) / block_size;
153    if nblocks.is_power_of_two() {
154        nblocks.trailing_zeros() as usize
155    } else {
156        nblocks.next_power_of_two().trailing_zeros() as usize - 1
157    }
158}
159
160fn size_for_order(order: usize, block_size: usize) -> usize {
161    block_size * (1 << (order as u32))
162}
163
164fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
165    let size = round_down(size, block_size);
166    assert!(block_size <= size);
167    assert!(block_size.is_power_of_two());
168    let max_order = order_fit(size, block_size);
169    let mut free_lists = Vec::new();
170    for _ in 0..max_order + 1 {
171        free_lists.push(FreeList::new())
172    }
173    let mut offset = 0;
174    while offset < size {
175        let order = order_fit(size - offset, block_size);
176        let size = size_for_order(order, block_size);
177        free_lists[order].push(offset);
178        offset += size;
179    }
180    free_lists
181}
182
183/// A future which will resolve to an allocated [`Buffer`].
184pub struct BufferFuture<'a> {
185    allocator: &'a BufferAllocator,
186    size: usize,
187    listener: Option<EventListener>,
188}
189
190impl<'a> Future for BufferFuture<'a> {
191    type Output = Buffer<'a>;
192
193    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194        if let Some(listener) = self.listener.as_mut() {
195            futures::ready!(listener.poll_unpin(context));
196        }
197        // Loop because we need to deal with the case where `listener` is ready immediately upon
198        // creation, in which case we ought to retry the allocation.
199        loop {
200            match self.allocator.try_allocate_buffer(self.size) {
201                Ok(buffer) => return Poll::Ready(buffer),
202                Err(mut listener) => {
203                    if listener.poll_unpin(context).is_pending() {
204                        self.listener = Some(listener);
205                        return Poll::Pending;
206                    }
207                }
208            }
209        }
210    }
211}
212
213impl BufferAllocator {
214    pub fn new(block_size: usize, source: BufferSource) -> Self {
215        let free_lists = initial_free_lists(source.size(), block_size);
216        Self {
217            block_size,
218            source,
219            inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
220            event: Event::new(),
221        }
222    }
223
224    pub fn block_size(&self) -> usize {
225        self.block_size
226    }
227
228    pub fn buffer_source(&self) -> &BufferSource {
229        &self.source
230    }
231
232    /// Takes the buffer source from the allocator and consumes the allocator.
233    pub fn take_buffer_source(self) -> BufferSource {
234        self.source
235    }
236
237    /// Allocates a Buffer with capacity for |size| bytes. Panics if the allocation exceeds the pool
238    /// size.  Blocks until there are enough bytes available to satisfy the request.
239    ///
240    /// The allocated buffer will be block-aligned and the padding up to block alignment can also
241    /// be used by the buffer.
242    ///
243    /// Allocation is O(lg(N) + M), where N = size and M = number of allocations.
244    pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
245        BufferFuture { allocator: self, size, listener: None }
246    }
247
248    /// Like |allocate_buffer|, but returns an EventListener if the allocation cannot be satisfied.
249    /// The listener will signal when the caller should try again.
250    pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
251        if size > self.source.size() {
252            panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
253        }
254        let mut inner = self.inner.lock().unwrap();
255        let requested_order = order(size, self.block_size());
256        assert!(requested_order < inner.free_lists.len());
257        // Pick the smallest possible order with a free entry.
258        let mut order = {
259            let mut idx = requested_order;
260            loop {
261                if idx >= inner.free_lists.len() {
262                    return Err(self.event.listen());
263                }
264                if !inner.free_lists[idx].is_empty() {
265                    break idx;
266                }
267                idx += 1;
268            }
269        };
270
271        // Split the free region until it's the right size.
272        let offset = inner.free_lists[order].pop().unwrap();
273        while order > requested_order {
274            order -= 1;
275            assert!(inner.free_lists[order].is_empty());
276            inner.free_lists[order].push(offset + self.size_for_order(order));
277        }
278
279        inner.allocation_map.insert(offset, self.size_for_order(order));
280        let range = offset..offset + size;
281        log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
282
283        // Safety is ensured by the allocator not double-allocating any regions.
284        Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
285    }
286
287    /// Deallocation is O(lg(N) + M), where N = size and M = number of allocations.
288    #[doc(hidden)]
289    pub(super) fn free_buffer(&self, range: Range<usize>) {
290        let mut inner = self.inner.lock().unwrap();
291        let mut offset = range.start;
292        let size = inner
293            .allocation_map
294            .remove(&offset)
295            .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
296        assert!(range.end - range.start <= size);
297        log::debug!(range:?, bytes_used = size; "Freeing");
298
299        // Merge as many free slots as we can.
300        let mut order = order(size, self.block_size());
301        while order < inner.free_lists.len() - 1 {
302            let buddy = self.find_buddy(offset, order);
303            let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
304                idx
305            } else {
306                break;
307            };
308            inner.free_lists[order].remove(idx);
309            offset = std::cmp::min(offset, buddy);
310            order += 1;
311        }
312
313        let idx = inner.free_lists[order]
314            .binary_search(&offset)
315            .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
316        inner.free_lists[order].insert(idx, offset);
317
318        // Notify all stuck tasks.  This might be inefficient, but it's simple and correct.
319        self.event.notify(usize::MAX);
320    }
321
322    fn size_for_order(&self, order: usize) -> usize {
323        size_for_order(order, self.block_size)
324    }
325
326    fn find_buddy(&self, offset: usize, order: usize) -> usize {
327        offset ^ self.size_for_order(order)
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use crate::buffer_allocator::{order, BufferAllocator, BufferSource};
334    use fuchsia_async as fasync;
335    use futures::future::join_all;
336    use futures::pin_mut;
337    use rand::prelude::SliceRandom;
338    use rand::{thread_rng, Rng};
339    use std::sync::atomic::{AtomicBool, Ordering};
340    use std::sync::Arc;
341
342    #[fuchsia::test]
343    async fn test_odd_sized_buffer_source() {
344        let source = BufferSource::new(123);
345        let allocator = BufferAllocator::new(2, source);
346
347        // 123 == 64 + 32 + 16 + 8 + 2 + 1. (The last byte is unusable.)
348        let sizes = vec![64, 32, 16, 8, 2];
349        let mut bufs = vec![];
350        for size in sizes.iter() {
351            bufs.push(allocator.allocate_buffer(*size).await);
352        }
353        for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
354            assert_eq!(*expected_size, buf.len());
355        }
356        assert!(allocator.try_allocate_buffer(2).is_err());
357    }
358
359    #[fuchsia::test]
360    async fn test_allocate_buffer_read_write() {
361        let source = BufferSource::new(1024 * 1024);
362        let allocator = BufferAllocator::new(8192, source);
363
364        let mut buf = allocator.allocate_buffer(8192).await;
365        buf.as_mut_slice().fill(0xaa as u8);
366        let mut vec = vec![0 as u8; 8192];
367        vec.copy_from_slice(buf.as_slice());
368        assert_eq!(vec, vec![0xaa as u8; 8192]);
369    }
370
371    #[fuchsia::test]
372    async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
373        let source = BufferSource::new(1024 * 1024);
374        let allocator = BufferAllocator::new(8192, source);
375
376        let buf1 = allocator.allocate_buffer(8192).await;
377        let buf2 = allocator.allocate_buffer(8192).await;
378        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
379    }
380
381    #[fuchsia::test]
382    async fn test_allocate_many_buffers() {
383        let source = BufferSource::new(1024 * 1024);
384        let allocator = BufferAllocator::new(8192, source);
385
386        for _ in 0..10 {
387            let _ = allocator.allocate_buffer(8192).await;
388        }
389    }
390
391    #[fuchsia::test]
392    async fn test_allocate_small_buffers_dont_overlap() {
393        let source = BufferSource::new(1024 * 1024);
394        let allocator = BufferAllocator::new(8192, source);
395
396        let buf1 = allocator.allocate_buffer(1).await;
397        let buf2 = allocator.allocate_buffer(1).await;
398        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
399    }
400
401    #[fuchsia::test]
402    async fn test_allocate_large_buffer() {
403        let source = BufferSource::new(1024 * 1024);
404        let allocator = BufferAllocator::new(8192, source);
405
406        let mut buf = allocator.allocate_buffer(1024 * 1024).await;
407        assert_eq!(buf.len(), 1024 * 1024);
408        buf.as_mut_slice().fill(0xaa as u8);
409        let mut vec = vec![0 as u8; 1024 * 1024];
410        vec.copy_from_slice(buf.as_slice());
411        assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
412    }
413
414    #[fuchsia::test]
415    async fn test_allocate_large_buffer_after_smaller_buffers() {
416        let source = BufferSource::new(1024 * 1024);
417        let allocator = BufferAllocator::new(8192, source);
418
419        {
420            let mut buffers = vec![];
421            while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
422                buffers.push(buffer);
423            }
424        }
425        let buf = allocator.allocate_buffer(1024 * 1024).await;
426        assert_eq!(buf.len(), 1024 * 1024);
427    }
428
429    #[fuchsia::test]
430    async fn test_allocate_at_limits() {
431        let source = BufferSource::new(1024 * 1024);
432        let allocator = BufferAllocator::new(8192, source);
433
434        let mut buffers = vec![];
435        while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
436            buffers.push(buffer);
437        }
438        // Deallocate a single buffer, and reallocate a single one back.
439        buffers.pop();
440        let buf = allocator.allocate_buffer(8192).await;
441        assert_eq!(buf.len(), 8192);
442    }
443
444    #[fuchsia::test(threads = 10)]
445    async fn test_random_allocs_deallocs() {
446        let source = BufferSource::new(16 * 1024 * 1024);
447        let bs = 512;
448        let allocator = Arc::new(BufferAllocator::new(bs, source));
449
450        join_all((0..10).map(|_| {
451            let allocator = allocator.clone();
452            fasync::Task::spawn(async move {
453                let mut rng = thread_rng();
454                enum Op {
455                    Alloc,
456                    Dealloc,
457                }
458                let ops = vec![Op::Alloc, Op::Dealloc];
459                let mut buffers = vec![];
460                for _ in 0..1000 {
461                    match ops.choose(&mut rng).unwrap() {
462                        Op::Alloc => {
463                            // Rather than a uniform distribution 1..64K, first pick an order and
464                            // then pick a size within that. For example, we might pick order 3,
465                            // which would give us 8 * 512..16 * 512 as our possible range.
466                            // This way we don't bias towards larger allocations too much.
467                            let order: usize = rng.gen_range(order(1, bs)..order(65536 + 1, bs));
468                            let size: usize = rng.gen_range(
469                                bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
470                            );
471                            if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
472                                let val = rng.gen::<u8>();
473                                buf.as_mut_slice().fill(val);
474                                for v in buf.as_slice() {
475                                    assert_eq!(v, &val);
476                                }
477                                buffers.push(buf);
478                            }
479                        }
480                        Op::Dealloc if !buffers.is_empty() => {
481                            let idx = rng.gen_range(0..buffers.len());
482                            buffers.remove(idx);
483                        }
484                        _ => {}
485                    };
486                }
487            })
488        }))
489        .await;
490    }
491
492    #[fuchsia::test]
493    async fn test_buffer_refs() {
494        let source = BufferSource::new(1024 * 1024);
495        let allocator = BufferAllocator::new(512, source);
496
497        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
498        // bugs.
499        let _buf = allocator.allocate_buffer(512).await;
500        let mut buf = allocator.allocate_buffer(4096).await;
501        let base = buf.range().start;
502        {
503            let mut bref = buf.subslice_mut(1000..2000);
504            assert_eq!(bref.len(), 1000);
505            assert_eq!(bref.range(), base + 1000..base + 2000);
506            bref.as_mut_slice().fill(0xbb);
507            {
508                let mut bref2 = bref.reborrow().subslice_mut(0..100);
509                assert_eq!(bref2.len(), 100);
510                assert_eq!(bref2.range(), base + 1000..base + 1100);
511                bref2.as_mut_slice().fill(0xaa);
512            }
513            {
514                let mut bref2 = bref.reborrow().subslice_mut(900..1000);
515                assert_eq!(bref2.len(), 100);
516                assert_eq!(bref2.range(), base + 1900..base + 2000);
517                bref2.as_mut_slice().fill(0xcc);
518            }
519            assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
520            assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
521
522            let bref = bref.subslice_mut(900..);
523            assert_eq!(bref.len(), 100);
524            assert_eq!(bref.as_slice(), vec![0xcc; 100]);
525        }
526        {
527            let bref = buf.as_ref();
528            assert_eq!(bref.len(), 4096);
529            assert_eq!(bref.range(), base..base + 4096);
530            assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
531            {
532                let bref2 = bref.subslice(1000..2000);
533                assert_eq!(bref2.len(), 1000);
534                assert_eq!(bref2.range(), base + 1000..base + 2000);
535                assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
536                assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
537                assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
538            }
539
540            let bref = bref.subslice(2048..);
541            assert_eq!(bref.len(), 2048);
542            assert_eq!(bref.as_slice(), vec![0x00; 2048]);
543        }
544    }
545
546    #[fuchsia::test]
547    async fn test_buffer_split() {
548        let source = BufferSource::new(1024 * 1024);
549        let allocator = BufferAllocator::new(512, source);
550
551        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
552        // bugs.
553        let _buf = allocator.allocate_buffer(512).await;
554        let mut buf = allocator.allocate_buffer(4096).await;
555        let base = buf.range().start;
556        {
557            let bref = buf.as_mut();
558            let (mut s1, mut s2) = bref.split_at_mut(2048);
559            assert_eq!(s1.len(), 2048);
560            assert_eq!(s1.range(), base..base + 2048);
561            s1.as_mut_slice().fill(0xaa);
562            assert_eq!(s2.len(), 2048);
563            assert_eq!(s2.range(), base + 2048..base + 4096);
564            s2.as_mut_slice().fill(0xbb);
565        }
566        {
567            let bref = buf.as_ref();
568            let (s1, s2) = bref.split_at(1);
569            let (s2, s3) = s2.split_at(2047);
570            let (s3, s4) = s3.split_at(0);
571            assert_eq!(s1.len(), 1);
572            assert_eq!(s1.range(), base..base + 1);
573            assert_eq!(s2.len(), 2047);
574            assert_eq!(s2.range(), base + 1..base + 2048);
575            assert_eq!(s3.len(), 0);
576            assert_eq!(s3.range(), base + 2048..base + 2048);
577            assert_eq!(s4.len(), 2048);
578            assert_eq!(s4.range(), base + 2048..base + 4096);
579            assert_eq!(s1.as_slice(), vec![0xaa; 1]);
580            assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
581            assert_eq!(s3.as_slice(), vec![]);
582            assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
583        }
584    }
585
586    #[fuchsia::test]
587    async fn test_blocking_allocation() {
588        let source = BufferSource::new(1024 * 1024);
589        let allocator = Arc::new(BufferAllocator::new(512, source));
590
591        let buf1 = allocator.allocate_buffer(512 * 1024).await;
592        let buf2 = allocator.allocate_buffer(512 * 1024).await;
593        let bufs_dropped = Arc::new(AtomicBool::new(false));
594
595        // buf3_fut should block until both buf1 and buf2 are done.
596        let allocator_clone = allocator.clone();
597        let bufs_dropped_clone = bufs_dropped.clone();
598        let buf3_fut = async move {
599            allocator_clone.allocate_buffer(1024 * 1024).await;
600            assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
601        };
602        pin_mut!(buf3_fut);
603
604        // Each of buf_futs should block until buf3_fut is done, and they should proceed in order.
605        let mut buf_futs = vec![];
606        for _ in 0..16 {
607            let allocator_clone = allocator.clone();
608            let bufs_dropped_clone = bufs_dropped.clone();
609            let fut = async move {
610                allocator_clone.allocate_buffer(64 * 1024).await;
611                // We can't say with certainty that buf3 proceeded first, nor can we ensure these
612                // allocations proceed in order, but we can make sure that at least buf1/buf2 were
613                // done (since they exhausted the pool).
614                assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
615            };
616            buf_futs.push(fut);
617        }
618
619        futures::join!(buf3_fut, join_all(buf_futs), async move {
620            std::mem::drop(buf1);
621            std::mem::drop(buf2);
622            bufs_dropped.store(true, Ordering::Relaxed);
623        });
624    }
625}