storage_device/
buffer_allocator.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16    use fuchsia_runtime::vmar_root_self;
17    use std::ops::Range;
18    use std::sync::Arc;
19
20    /// A buffer source backed by a VMO.
21    #[derive(Debug)]
22    pub struct BufferSource {
23        base: *mut u8,
24        size: usize,
25        vmo: Arc<zx::Vmo>,
26    }
27
28    // SAFETY: This is required for the *mut u8 which is just the base address of the VMO mapping
29    // and doesn't stop us making BufferSource Send and Sync.
30    unsafe impl Send for BufferSource {}
31    unsafe impl Sync for BufferSource {}
32
33    impl BufferSource {
34        pub fn new(size: usize) -> Self {
35            let vmo = Arc::new(zx::Vmo::create(size as u64).unwrap());
36            let name = zx::Name::new("transfer-buf").unwrap();
37            vmo.set_name(&name).unwrap();
38            let flags = zx::VmarFlags::PERM_READ
39                | zx::VmarFlags::PERM_WRITE
40                | zx::VmarFlags::MAP_RANGE
41                | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42            let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43            Self { base, size, vmo }
44        }
45
46        pub fn slice(&self) -> *mut [u8] {
47            std::ptr::slice_from_raw_parts_mut(self.base, self.size)
48        }
49
50        pub fn size(&self) -> usize {
51            self.size
52        }
53
54        pub fn vmo(&self) -> &Arc<zx::Vmo> {
55            &self.vmo
56        }
57
58        #[allow(clippy::mut_from_ref)]
59        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
60            assert!(range.start < self.size && range.end <= self.size);
61            unsafe {
62                std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
63            }
64        }
65
66        /// Commits the range in memory to avoid future page faults.
67        pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
68            self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
69        }
70    }
71
72    impl Drop for BufferSource {
73        fn drop(&mut self) {
74            // SAFETY: This balances the `map` in `new` above.
75            unsafe {
76                let _ = vmar_root_self().unmap(self.base as usize, self.size);
77            }
78        }
79    }
80}
81
82#[cfg(not(target_os = "fuchsia"))]
83mod buffer_source {
84    use std::cell::UnsafeCell;
85    use std::ops::Range;
86    use std::pin::Pin;
87
88    /// A basic heap-backed buffer source.
89    #[derive(Debug)]
90    pub struct BufferSource {
91        // We use an UnsafeCell here because we need interior mutability of the buffer (to hand out
92        // mutable slices to it in |buffer()|), but don't want to pay the cost of wrapping the
93        // buffer in a Mutex. We must guarantee that the Buffer objects we hand out don't overlap,
94        // but that is already a requirement for correctness.
95        data: UnsafeCell<Pin<Vec<u8>>>,
96    }
97
98    // Safe because none of the fields in BufferSource are modified, except the contents of |data|,
99    // but that is managed by the BufferAllocator.
100    unsafe impl Sync for BufferSource {}
101
102    impl BufferSource {
103        pub fn new(size: usize) -> Self {
104            Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
105        }
106
107        pub fn size(&self) -> usize {
108            // Safe because the reference goes out of scope as soon as we use it.
109            unsafe { (&*self.data.get()).len() }
110        }
111
112        #[allow(clippy::mut_from_ref)]
113        pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
114            assert!(range.start < self.size() && range.end <= self.size());
115            unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
116        }
117    }
118}
119
120pub use buffer_source::BufferSource;
121
122// Stores a list of offsets into a BufferSource. The size of the free ranges is determined by which
123// FreeList we are looking at.
124// FreeLists are sorted.
125type FreeList = Vec<usize>;
126
127#[derive(Debug)]
128struct Inner {
129    // The index corresponds to the order of free memory blocks in the free list.
130    free_lists: Vec<FreeList>,
131    // Maps offsets to allocated length (the actual length, not the size requested by the client).
132    allocation_map: BTreeMap<usize, usize>,
133}
134
135/// BufferAllocator creates Buffer objects to be used for block device I/O requests.
136///
137/// This is implemented through a simple buddy allocation scheme.
138#[derive(Debug)]
139pub struct BufferAllocator {
140    block_size: usize,
141    source: BufferSource,
142    inner: Mutex<Inner>,
143    event: Event,
144}
145
146// Returns the smallest order which is at least |size| bytes.
147fn order(size: usize, block_size: usize) -> usize {
148    if size <= block_size {
149        return 0;
150    }
151    let nblocks = round_up(size, block_size) / block_size;
152    nblocks.next_power_of_two().trailing_zeros() as usize
153}
154
155// Returns the largest order which is no more than |size| bytes.
156fn order_fit(size: usize, block_size: usize) -> usize {
157    assert!(size >= block_size);
158    let nblocks = round_up(size, block_size) / block_size;
159    if nblocks.is_power_of_two() {
160        nblocks.trailing_zeros() as usize
161    } else {
162        nblocks.next_power_of_two().trailing_zeros() as usize - 1
163    }
164}
165
166fn size_for_order(order: usize, block_size: usize) -> usize {
167    block_size * (1 << (order as u32))
168}
169
170fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
171    let size = round_down(size, block_size);
172    assert!(block_size <= size);
173    assert!(block_size.is_power_of_two());
174    let max_order = order_fit(size, block_size);
175    let mut free_lists = Vec::new();
176    for _ in 0..max_order + 1 {
177        free_lists.push(FreeList::new())
178    }
179    let mut offset = 0;
180    while offset < size {
181        let order = order_fit(size - offset, block_size);
182        let size = size_for_order(order, block_size);
183        free_lists[order].push(offset);
184        offset += size;
185    }
186    free_lists
187}
188
189/// A future which will resolve to an allocated [`Buffer`].
190pub struct BufferFuture<'a> {
191    allocator: &'a BufferAllocator,
192    size: usize,
193    listener: Option<EventListener>,
194}
195
196impl<'a> Future for BufferFuture<'a> {
197    type Output = Buffer<'a>;
198
199    fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
200        if let Some(listener) = self.listener.as_mut() {
201            futures::ready!(listener.poll_unpin(context));
202        }
203        // Loop because we need to deal with the case where `listener` is ready immediately upon
204        // creation, in which case we ought to retry the allocation.
205        loop {
206            match self.allocator.try_allocate_buffer(self.size) {
207                Ok(buffer) => return Poll::Ready(buffer),
208                Err(mut listener) => {
209                    if listener.poll_unpin(context).is_pending() {
210                        self.listener = Some(listener);
211                        return Poll::Pending;
212                    }
213                }
214            }
215        }
216    }
217}
218
219impl BufferAllocator {
220    pub fn new(block_size: usize, source: BufferSource) -> Self {
221        let free_lists = initial_free_lists(source.size(), block_size);
222        Self {
223            block_size,
224            source,
225            inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
226            event: Event::new(),
227        }
228    }
229
230    pub fn block_size(&self) -> usize {
231        self.block_size
232    }
233
234    pub fn buffer_source(&self) -> &BufferSource {
235        &self.source
236    }
237
238    /// Takes the buffer source from the allocator and consumes the allocator.
239    pub fn take_buffer_source(self) -> BufferSource {
240        self.source
241    }
242
243    /// Allocates a Buffer with capacity for |size| bytes. Panics if the allocation exceeds the pool
244    /// size.  Blocks until there are enough bytes available to satisfy the request.
245    ///
246    /// The allocated buffer will be block-aligned and the padding up to block alignment can also
247    /// be used by the buffer.
248    ///
249    /// Allocation is O(lg(N) + M), where N = size and M = number of allocations.
250    pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
251        BufferFuture { allocator: self, size, listener: None }
252    }
253
254    /// Like |allocate_buffer|, but returns an EventListener if the allocation cannot be satisfied.
255    /// The listener will signal when the caller should try again.
256    pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
257        if size > self.source.size() {
258            panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
259        }
260        let mut inner = self.inner.lock();
261        let requested_order = order(size, self.block_size());
262        assert!(requested_order < inner.free_lists.len());
263        // Pick the smallest possible order with a free entry.
264        let mut order = {
265            let mut idx = requested_order;
266            loop {
267                if idx >= inner.free_lists.len() {
268                    return Err(self.event.listen());
269                }
270                if !inner.free_lists[idx].is_empty() {
271                    break idx;
272                }
273                idx += 1;
274            }
275        };
276
277        // Split the free region until it's the right size.
278        let offset = inner.free_lists[order].pop().unwrap();
279        while order > requested_order {
280            order -= 1;
281            assert!(inner.free_lists[order].is_empty());
282            inner.free_lists[order].push(offset + self.size_for_order(order));
283        }
284
285        inner.allocation_map.insert(offset, self.size_for_order(order));
286        let range = offset..offset + size;
287        log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
288
289        // Safety is ensured by the allocator not double-allocating any regions.
290        Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
291    }
292
293    /// Deallocation is O(lg(N) + M), where N = size and M = number of allocations.
294    #[doc(hidden)]
295    pub(super) fn free_buffer(&self, range: Range<usize>) {
296        let mut inner = self.inner.lock();
297        let mut offset = range.start;
298        let size = inner
299            .allocation_map
300            .remove(&offset)
301            .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
302        assert!(range.end - range.start <= size);
303        log::debug!(range:?, bytes_used = size; "Freeing");
304
305        // Merge as many free slots as we can.
306        let mut order = order(size, self.block_size());
307        while order < inner.free_lists.len() - 1 {
308            let buddy = self.find_buddy(offset, order);
309            let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
310                idx
311            } else {
312                break;
313            };
314            inner.free_lists[order].remove(idx);
315            offset = std::cmp::min(offset, buddy);
316            order += 1;
317        }
318
319        let idx = inner.free_lists[order]
320            .binary_search(&offset)
321            .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
322        inner.free_lists[order].insert(idx, offset);
323
324        // Notify all stuck tasks.  This might be inefficient, but it's simple and correct.
325        self.event.notify(usize::MAX);
326    }
327
328    fn size_for_order(&self, order: usize) -> usize {
329        size_for_order(order, self.block_size)
330    }
331
332    fn find_buddy(&self, offset: usize, order: usize) -> usize {
333        offset ^ self.size_for_order(order)
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
340    use fuchsia_async as fasync;
341    use futures::future::join_all;
342    use futures::pin_mut;
343    use rand::seq::IndexedRandom;
344    use rand::{Rng, rng};
345    use std::sync::Arc;
346    use std::sync::atomic::{AtomicBool, Ordering};
347
348    #[fuchsia::test]
349    async fn test_odd_sized_buffer_source() {
350        let source = BufferSource::new(123);
351        let allocator = BufferAllocator::new(2, source);
352
353        // 123 == 64 + 32 + 16 + 8 + 2 + 1. (The last byte is unusable.)
354        let sizes = vec![64, 32, 16, 8, 2];
355        let mut bufs = vec![];
356        for size in sizes.iter() {
357            bufs.push(allocator.allocate_buffer(*size).await);
358        }
359        for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
360            assert_eq!(*expected_size, buf.len());
361        }
362        assert!(allocator.try_allocate_buffer(2).is_err());
363    }
364
365    #[fuchsia::test]
366    async fn test_allocate_buffer_read_write() {
367        let source = BufferSource::new(1024 * 1024);
368        let allocator = BufferAllocator::new(8192, source);
369
370        let mut buf = allocator.allocate_buffer(8192).await;
371        buf.as_mut_slice().fill(0xaa as u8);
372        let mut vec = vec![0 as u8; 8192];
373        vec.copy_from_slice(buf.as_slice());
374        assert_eq!(vec, vec![0xaa as u8; 8192]);
375    }
376
377    #[fuchsia::test]
378    async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
379        let source = BufferSource::new(1024 * 1024);
380        let allocator = BufferAllocator::new(8192, source);
381
382        let buf1 = allocator.allocate_buffer(8192).await;
383        let buf2 = allocator.allocate_buffer(8192).await;
384        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
385    }
386
387    #[fuchsia::test]
388    async fn test_allocate_many_buffers() {
389        let source = BufferSource::new(1024 * 1024);
390        let allocator = BufferAllocator::new(8192, source);
391
392        for _ in 0..10 {
393            let _ = allocator.allocate_buffer(8192).await;
394        }
395    }
396
397    #[fuchsia::test]
398    async fn test_allocate_small_buffers_dont_overlap() {
399        let source = BufferSource::new(1024 * 1024);
400        let allocator = BufferAllocator::new(8192, source);
401
402        let buf1 = allocator.allocate_buffer(1).await;
403        let buf2 = allocator.allocate_buffer(1).await;
404        assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
405    }
406
407    #[fuchsia::test]
408    async fn test_allocate_large_buffer() {
409        let source = BufferSource::new(1024 * 1024);
410        let allocator = BufferAllocator::new(8192, source);
411
412        let mut buf = allocator.allocate_buffer(1024 * 1024).await;
413        assert_eq!(buf.len(), 1024 * 1024);
414        buf.as_mut_slice().fill(0xaa as u8);
415        let mut vec = vec![0 as u8; 1024 * 1024];
416        vec.copy_from_slice(buf.as_slice());
417        assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
418    }
419
420    #[fuchsia::test]
421    async fn test_allocate_large_buffer_after_smaller_buffers() {
422        let source = BufferSource::new(1024 * 1024);
423        let allocator = BufferAllocator::new(8192, source);
424
425        {
426            let mut buffers = vec![];
427            while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
428                buffers.push(buffer);
429            }
430        }
431        let buf = allocator.allocate_buffer(1024 * 1024).await;
432        assert_eq!(buf.len(), 1024 * 1024);
433    }
434
435    #[fuchsia::test]
436    async fn test_allocate_at_limits() {
437        let source = BufferSource::new(1024 * 1024);
438        let allocator = BufferAllocator::new(8192, source);
439
440        let mut buffers = vec![];
441        while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
442            buffers.push(buffer);
443        }
444        // Deallocate a single buffer, and reallocate a single one back.
445        buffers.pop();
446        let buf = allocator.allocate_buffer(8192).await;
447        assert_eq!(buf.len(), 8192);
448    }
449
450    #[fuchsia::test(threads = 10)]
451    async fn test_random_allocs_deallocs() {
452        let source = BufferSource::new(16 * 1024 * 1024);
453        let bs = 512;
454        let allocator = Arc::new(BufferAllocator::new(bs, source));
455
456        join_all((0..10).map(|_| {
457            let allocator = allocator.clone();
458            fasync::Task::spawn(async move {
459                let mut rng = rng();
460                enum Op {
461                    Alloc,
462                    Dealloc,
463                }
464                let ops = vec![Op::Alloc, Op::Dealloc];
465                let mut buffers = vec![];
466                for _ in 0..1000 {
467                    match ops.choose(&mut rng).unwrap() {
468                        Op::Alloc => {
469                            // Rather than a uniform distribution 1..64K, first pick an order and
470                            // then pick a size within that. For example, we might pick order 3,
471                            // which would give us 8 * 512..16 * 512 as our possible range.
472                            // This way we don't bias towards larger allocations too much.
473                            let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
474                            let size: usize = rng.random_range(
475                                bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
476                            );
477                            if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
478                                let val = rng.random::<u8>();
479                                buf.as_mut_slice().fill(val);
480                                for v in buf.as_slice() {
481                                    assert_eq!(v, &val);
482                                }
483                                buffers.push(buf);
484                            }
485                        }
486                        Op::Dealloc if !buffers.is_empty() => {
487                            let idx = rng.random_range(0..buffers.len());
488                            buffers.remove(idx);
489                        }
490                        _ => {}
491                    };
492                }
493            })
494        }))
495        .await;
496    }
497
498    #[fuchsia::test]
499    async fn test_buffer_refs() {
500        let source = BufferSource::new(1024 * 1024);
501        let allocator = BufferAllocator::new(512, source);
502
503        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
504        // bugs.
505        let _buf = allocator.allocate_buffer(512).await;
506        let mut buf = allocator.allocate_buffer(4096).await;
507        let base = buf.range().start;
508        {
509            let mut bref = buf.subslice_mut(1000..2000);
510            assert_eq!(bref.len(), 1000);
511            assert_eq!(bref.range(), base + 1000..base + 2000);
512            bref.as_mut_slice().fill(0xbb);
513            {
514                let mut bref2 = bref.reborrow().subslice_mut(0..100);
515                assert_eq!(bref2.len(), 100);
516                assert_eq!(bref2.range(), base + 1000..base + 1100);
517                bref2.as_mut_slice().fill(0xaa);
518            }
519            {
520                let mut bref2 = bref.reborrow().subslice_mut(900..1000);
521                assert_eq!(bref2.len(), 100);
522                assert_eq!(bref2.range(), base + 1900..base + 2000);
523                bref2.as_mut_slice().fill(0xcc);
524            }
525            assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
526            assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
527
528            let bref = bref.subslice_mut(900..);
529            assert_eq!(bref.len(), 100);
530            assert_eq!(bref.as_slice(), vec![0xcc; 100]);
531        }
532        {
533            let bref = buf.as_ref();
534            assert_eq!(bref.len(), 4096);
535            assert_eq!(bref.range(), base..base + 4096);
536            assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
537            {
538                let bref2 = bref.subslice(1000..2000);
539                assert_eq!(bref2.len(), 1000);
540                assert_eq!(bref2.range(), base + 1000..base + 2000);
541                assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
542                assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
543                assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
544            }
545
546            let bref = bref.subslice(2048..);
547            assert_eq!(bref.len(), 2048);
548            assert_eq!(bref.as_slice(), vec![0x00; 2048]);
549        }
550    }
551
552    #[fuchsia::test]
553    async fn test_buffer_split() {
554        let source = BufferSource::new(1024 * 1024);
555        let allocator = BufferAllocator::new(512, source);
556
557        // Allocate one buffer first so that |buf| is not starting at offset 0. This helps catch
558        // bugs.
559        let _buf = allocator.allocate_buffer(512).await;
560        let mut buf = allocator.allocate_buffer(4096).await;
561        let base = buf.range().start;
562        {
563            let bref = buf.as_mut();
564            let (mut s1, mut s2) = bref.split_at_mut(2048);
565            assert_eq!(s1.len(), 2048);
566            assert_eq!(s1.range(), base..base + 2048);
567            s1.as_mut_slice().fill(0xaa);
568            assert_eq!(s2.len(), 2048);
569            assert_eq!(s2.range(), base + 2048..base + 4096);
570            s2.as_mut_slice().fill(0xbb);
571        }
572        {
573            let bref = buf.as_ref();
574            let (s1, s2) = bref.split_at(1);
575            let (s2, s3) = s2.split_at(2047);
576            let (s3, s4) = s3.split_at(0);
577            assert_eq!(s1.len(), 1);
578            assert_eq!(s1.range(), base..base + 1);
579            assert_eq!(s2.len(), 2047);
580            assert_eq!(s2.range(), base + 1..base + 2048);
581            assert_eq!(s3.len(), 0);
582            assert_eq!(s3.range(), base + 2048..base + 2048);
583            assert_eq!(s4.len(), 2048);
584            assert_eq!(s4.range(), base + 2048..base + 4096);
585            assert_eq!(s1.as_slice(), vec![0xaa; 1]);
586            assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
587            assert_eq!(s3.as_slice(), &[] as &[u8]);
588            assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
589        }
590    }
591
592    #[fuchsia::test]
593    async fn test_blocking_allocation() {
594        let source = BufferSource::new(1024 * 1024);
595        let allocator = Arc::new(BufferAllocator::new(512, source));
596
597        let buf1 = allocator.allocate_buffer(512 * 1024).await;
598        let buf2 = allocator.allocate_buffer(512 * 1024).await;
599        let bufs_dropped = Arc::new(AtomicBool::new(false));
600
601        // buf3_fut should block until both buf1 and buf2 are done.
602        let allocator_clone = allocator.clone();
603        let bufs_dropped_clone = bufs_dropped.clone();
604        let buf3_fut = async move {
605            allocator_clone.allocate_buffer(1024 * 1024).await;
606            assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
607        };
608        pin_mut!(buf3_fut);
609
610        // Each of buf_futs should block until buf3_fut is done, and they should proceed in order.
611        let mut buf_futs = vec![];
612        for _ in 0..16 {
613            let allocator_clone = allocator.clone();
614            let bufs_dropped_clone = bufs_dropped.clone();
615            let fut = async move {
616                allocator_clone.allocate_buffer(64 * 1024).await;
617                // We can't say with certainty that buf3 proceeded first, nor can we ensure these
618                // allocations proceed in order, but we can make sure that at least buf1/buf2 were
619                // done (since they exhausted the pool).
620                assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
621            };
622            buf_futs.push(fut);
623        }
624
625        futures::join!(buf3_fut, join_all(buf_futs), async move {
626            std::mem::drop(buf1);
627            std::mem::drop(buf2);
628            bufs_dropped.store(true, Ordering::Relaxed);
629        });
630    }
631}