1use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16 use fuchsia_runtime::vmar_root_self;
17 use std::ops::Range;
18 use std::sync::Arc;
19
20 #[derive(Debug)]
22 pub struct BufferSource {
23 base: *mut u8,
24 size: usize,
25 vmo: Arc<zx::Vmo>,
26 }
27
28 unsafe impl Send for BufferSource {}
31 unsafe impl Sync for BufferSource {}
32
33 impl BufferSource {
34 pub fn new(size: usize) -> Self {
35 let vmo = Arc::new(zx::Vmo::create(size as u64).unwrap());
36 let name = zx::Name::new("transfer-buf").unwrap();
37 vmo.set_name(&name).unwrap();
38 let flags = zx::VmarFlags::PERM_READ
39 | zx::VmarFlags::PERM_WRITE
40 | zx::VmarFlags::MAP_RANGE
41 | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42 let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43 Self { base, size, vmo }
44 }
45
46 pub fn slice(&self) -> *mut [u8] {
47 std::ptr::slice_from_raw_parts_mut(self.base, self.size)
48 }
49
50 pub fn size(&self) -> usize {
51 self.size
52 }
53
54 pub fn vmo(&self) -> &Arc<zx::Vmo> {
55 &self.vmo
56 }
57
58 #[allow(clippy::mut_from_ref)]
59 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
60 assert!(range.start < self.size && range.end <= self.size);
61 unsafe {
62 std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
63 }
64 }
65
66 pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
68 self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
69 }
70
71 pub(super) unsafe fn clean_range(&self, range: Range<usize>) {
77 let _ = self.vmo.op_range(zx::VmoOp::ZERO, range.start as u64, range.len() as u64);
78 }
79 }
80
81 impl Drop for BufferSource {
82 fn drop(&mut self) {
83 unsafe {
85 let _ = vmar_root_self().unmap(self.base as usize, self.size);
86 }
87 }
88 }
89}
90
91#[cfg(not(target_os = "fuchsia"))]
92mod buffer_source {
93 use std::cell::UnsafeCell;
94 use std::ops::Range;
95 use std::pin::Pin;
96
97 #[derive(Debug)]
99 pub struct BufferSource {
100 data: UnsafeCell<Pin<Vec<u8>>>,
105 }
106
107 unsafe impl Sync for BufferSource {}
110
111 impl BufferSource {
112 pub fn new(size: usize) -> Self {
113 Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
114 }
115
116 pub fn size(&self) -> usize {
117 unsafe { (&*self.data.get()).len() }
119 }
120
121 #[allow(clippy::mut_from_ref)]
122 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
123 assert!(range.start < self.size() && range.end <= self.size());
124 unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
125 }
126
127 pub(super) unsafe fn clean_range(&self, range: Range<usize>) {
133 unsafe { self.sub_slice(&range) }.fill(0);
134 }
135 }
136}
137
138pub use buffer_source::BufferSource;
139
140type FreeList = Vec<usize>;
144
145#[derive(Debug)]
146struct Inner {
147 free_lists: Vec<FreeList>,
149 allocation_map: BTreeMap<usize, usize>,
151}
152
153#[derive(Debug)]
157pub struct BufferAllocator {
158 block_size: usize,
159 source: BufferSource,
160 inner: Mutex<Inner>,
161 event: Event,
162}
163
164fn order(size: usize, block_size: usize) -> usize {
166 if size <= block_size {
167 return 0;
168 }
169 let nblocks = round_up(size, block_size) / block_size;
170 nblocks.next_power_of_two().trailing_zeros() as usize
171}
172
173fn order_fit(size: usize, block_size: usize) -> usize {
175 assert!(size >= block_size);
176 let nblocks = round_up(size, block_size) / block_size;
177 if nblocks.is_power_of_two() {
178 nblocks.trailing_zeros() as usize
179 } else {
180 nblocks.next_power_of_two().trailing_zeros() as usize - 1
181 }
182}
183
184fn size_for_order(order: usize, block_size: usize) -> usize {
185 block_size * (1 << (order as u32))
186}
187
188fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
189 let size = round_down(size, block_size);
190 assert!(block_size <= size);
191 assert!(block_size.is_power_of_two());
192 let max_order = order_fit(size, block_size);
193 let mut free_lists = Vec::with_capacity(max_order + 1);
194 for _ in 0..=max_order {
195 free_lists.push(FreeList::new())
196 }
197 let mut offset = 0;
198 while offset < size {
199 let order = order_fit(size - offset, block_size);
200 let size = size_for_order(order, block_size);
201 free_lists[order].push(offset);
202 offset += size;
203 }
204 free_lists
205}
206
207pub struct BufferFuture<'a> {
209 allocator: &'a BufferAllocator,
210 size: usize,
211 listener: Option<EventListener>,
212}
213
214impl<'a> Future for BufferFuture<'a> {
215 type Output = Buffer<'a>;
216
217 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
218 if let Some(listener) = self.listener.as_mut() {
219 futures::ready!(listener.poll_unpin(context));
220 }
221 loop {
224 match self.allocator.try_allocate_buffer(self.size) {
225 Ok(buffer) => return Poll::Ready(buffer),
226 Err(mut listener) => {
227 if listener.poll_unpin(context).is_pending() {
228 self.listener = Some(listener);
229 return Poll::Pending;
230 }
231 }
232 }
233 }
234 }
235}
236
237impl BufferAllocator {
238 pub fn new(block_size: usize, source: BufferSource) -> Self {
239 let free_lists = initial_free_lists(source.size(), block_size);
240 Self {
241 block_size,
242 source,
243 inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
244 event: Event::new(),
245 }
246 }
247
248 pub fn block_size(&self) -> usize {
249 self.block_size
250 }
251
252 pub fn buffer_source(&self) -> &BufferSource {
253 &self.source
254 }
255
256 pub fn take_buffer_source(self) -> BufferSource {
258 self.source
259 }
260
261 pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
269 BufferFuture { allocator: self, size, listener: None }
270 }
271
272 pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
275 if size > self.source.size() {
276 panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
277 }
278 let mut inner = self.inner.lock();
279 let requested_order = order(size, self.block_size());
280 assert!(requested_order < inner.free_lists.len());
281 let mut order = {
283 let mut idx = requested_order;
284 loop {
285 if idx >= inner.free_lists.len() {
286 return Err(self.event.listen());
287 }
288 if !inner.free_lists[idx].is_empty() {
289 break idx;
290 }
291 idx += 1;
292 }
293 };
294
295 let offset = inner.free_lists[order].pop().unwrap();
297 while order > requested_order {
298 order -= 1;
299 assert!(inner.free_lists[order].is_empty());
300 inner.free_lists[order].push(offset + self.size_for_order(order));
301 }
302
303 inner.allocation_map.insert(offset, self.size_for_order(order));
304 let range = offset..offset + size;
305 log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
306
307 Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
309 }
310
311 #[doc(hidden)]
313 pub(super) fn free_buffer(&self, range: Range<usize>) {
314 let mut inner = self.inner.lock();
315 let mut offset = range.start;
316 let size = inner
317 .allocation_map
318 .remove(&offset)
319 .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
320 assert!(range.end - range.start <= size);
321 log::debug!(range:?, bytes_used = size; "Freeing");
322
323 let mut order = order(size, self.block_size());
325 while order < inner.free_lists.len() - 1 {
326 let buddy = self.find_buddy(offset, order);
327 let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
328 idx
329 } else {
330 break;
331 };
332 inner.free_lists[order].remove(idx);
333 offset = std::cmp::min(offset, buddy);
334 order += 1;
335 }
336
337 let idx = match inner.free_lists[order].binary_search(&offset) {
338 Ok(_) => panic!("Unexpectedly found {} in free list {}", offset, order),
339 Err(idx) => idx,
340 };
341 inner.free_lists[order].insert(idx, offset);
342
343 self.event.notify(usize::MAX);
345 }
346
347 fn size_for_order(&self, order: usize) -> usize {
348 size_for_order(order, self.block_size)
349 }
350
351 fn find_buddy(&self, offset: usize, order: usize) -> usize {
352 offset ^ self.size_for_order(order)
353 }
354
355 pub fn clean_transfer_buffer(&self) {
357 let inner = self.inner.lock();
358 for (n, free_list) in inner.free_lists.iter().enumerate() {
359 for offset in free_list {
360 unsafe {
362 self.source.clean_range(*offset..(*offset + size_for_order(n, self.block_size)))
363 };
364 }
365 }
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
372 use fuchsia_async as fasync;
373 use futures::future::join_all;
374 use futures::pin_mut;
375 use rand::seq::IndexedRandom;
376 use rand::{Rng, rng};
377 use std::sync::Arc;
378 use std::sync::atomic::{AtomicBool, Ordering};
379
380 #[fuchsia::test]
381 async fn test_odd_sized_buffer_source() {
382 let source = BufferSource::new(123);
383 let allocator = BufferAllocator::new(2, source);
384
385 let sizes = vec![64, 32, 16, 8, 2];
387 let mut bufs = vec![];
388 for size in sizes.iter() {
389 bufs.push(allocator.allocate_buffer(*size).await);
390 }
391 for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
392 assert_eq!(*expected_size, buf.len());
393 }
394 assert!(allocator.try_allocate_buffer(2).is_err());
395 }
396
397 #[fuchsia::test]
398 async fn test_allocate_buffer_read_write() {
399 let source = BufferSource::new(1024 * 1024);
400 let allocator = BufferAllocator::new(8192, source);
401
402 let mut buf = allocator.allocate_buffer(8192).await;
403 buf.as_mut_slice().fill(0xaa as u8);
404 let mut vec = vec![0 as u8; 8192];
405 vec.copy_from_slice(buf.as_slice());
406 assert_eq!(vec, vec![0xaa as u8; 8192]);
407 }
408
409 #[fuchsia::test]
410 async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
411 let source = BufferSource::new(1024 * 1024);
412 let allocator = BufferAllocator::new(8192, source);
413
414 let buf1 = allocator.allocate_buffer(8192).await;
415 let buf2 = allocator.allocate_buffer(8192).await;
416 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
417 }
418
419 #[fuchsia::test]
420 async fn test_allocate_many_buffers() {
421 let source = BufferSource::new(1024 * 1024);
422 let allocator = BufferAllocator::new(8192, source);
423
424 for _ in 0..10 {
425 let _ = allocator.allocate_buffer(8192).await;
426 }
427 }
428
429 #[fuchsia::test]
430 async fn test_allocate_small_buffers_dont_overlap() {
431 let source = BufferSource::new(1024 * 1024);
432 let allocator = BufferAllocator::new(8192, source);
433
434 let buf1 = allocator.allocate_buffer(1).await;
435 let buf2 = allocator.allocate_buffer(1).await;
436 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
437 }
438
439 #[fuchsia::test]
440 async fn test_allocate_large_buffer() {
441 let source = BufferSource::new(1024 * 1024);
442 let allocator = BufferAllocator::new(8192, source);
443
444 let mut buf = allocator.allocate_buffer(1024 * 1024).await;
445 assert_eq!(buf.len(), 1024 * 1024);
446 buf.as_mut_slice().fill(0xaa as u8);
447 let mut vec = vec![0 as u8; 1024 * 1024];
448 vec.copy_from_slice(buf.as_slice());
449 assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
450 }
451
452 #[fuchsia::test]
453 async fn test_allocate_large_buffer_after_smaller_buffers() {
454 let source = BufferSource::new(1024 * 1024);
455 let allocator = BufferAllocator::new(8192, source);
456
457 {
458 let mut buffers = vec![];
459 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
460 buffers.push(buffer);
461 }
462 }
463 let buf = allocator.allocate_buffer(1024 * 1024).await;
464 assert_eq!(buf.len(), 1024 * 1024);
465 }
466
467 #[fuchsia::test]
468 async fn test_allocate_at_limits() {
469 let source = BufferSource::new(1024 * 1024);
470 let allocator = BufferAllocator::new(8192, source);
471
472 let mut buffers = vec![];
473 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
474 buffers.push(buffer);
475 }
476 buffers.pop();
478 let buf = allocator.allocate_buffer(8192).await;
479 assert_eq!(buf.len(), 8192);
480 }
481
482 #[fuchsia::test(threads = 10)]
483 async fn test_random_allocs_deallocs() {
484 let source = BufferSource::new(16 * 1024 * 1024);
485 let bs = 512;
486 let allocator = Arc::new(BufferAllocator::new(bs, source));
487
488 join_all((0..10).map(|_| {
489 let allocator = allocator.clone();
490 fasync::Task::spawn(async move {
491 let mut rng = rng();
492 enum Op {
493 Alloc,
494 Dealloc,
495 }
496 let ops = vec![Op::Alloc, Op::Dealloc];
497 let mut buffers = vec![];
498 for _ in 0..1000 {
499 match ops.choose(&mut rng).unwrap() {
500 Op::Alloc => {
501 let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
506 let size: usize = rng.random_range(
507 bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
508 );
509 if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
510 let val = rng.random::<u8>();
511 buf.as_mut_slice().fill(val);
512 for v in buf.as_slice() {
513 assert_eq!(v, &val);
514 }
515 buffers.push(buf);
516 }
517 }
518 Op::Dealloc if !buffers.is_empty() => {
519 let idx = rng.random_range(0..buffers.len());
520 buffers.remove(idx);
521 }
522 _ => {}
523 };
524 }
525 })
526 }))
527 .await;
528 }
529
530 #[fuchsia::test]
531 async fn test_buffer_refs() {
532 let source = BufferSource::new(1024 * 1024);
533 let allocator = BufferAllocator::new(512, source);
534
535 let _buf = allocator.allocate_buffer(512).await;
538 let mut buf = allocator.allocate_buffer(4096).await;
539 let base = buf.range().start;
540 {
541 let mut bref = buf.subslice_mut(1000..2000);
542 assert_eq!(bref.len(), 1000);
543 assert_eq!(bref.range(), base + 1000..base + 2000);
544 bref.as_mut_slice().fill(0xbb);
545 {
546 let mut bref2 = bref.reborrow().subslice_mut(0..100);
547 assert_eq!(bref2.len(), 100);
548 assert_eq!(bref2.range(), base + 1000..base + 1100);
549 bref2.as_mut_slice().fill(0xaa);
550 }
551 {
552 let mut bref2 = bref.reborrow().subslice_mut(900..1000);
553 assert_eq!(bref2.len(), 100);
554 assert_eq!(bref2.range(), base + 1900..base + 2000);
555 bref2.as_mut_slice().fill(0xcc);
556 }
557 assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
558 assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
559
560 let bref = bref.subslice_mut(900..);
561 assert_eq!(bref.len(), 100);
562 assert_eq!(bref.as_slice(), vec![0xcc; 100]);
563 }
564 {
565 let bref = buf.as_ref();
566 assert_eq!(bref.len(), 4096);
567 assert_eq!(bref.range(), base..base + 4096);
568 assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
569 {
570 let bref2 = bref.subslice(1000..2000);
571 assert_eq!(bref2.len(), 1000);
572 assert_eq!(bref2.range(), base + 1000..base + 2000);
573 assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
574 assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
575 assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
576 }
577
578 let bref = bref.subslice(2048..);
579 assert_eq!(bref.len(), 2048);
580 assert_eq!(bref.as_slice(), vec![0x00; 2048]);
581 }
582 }
583
584 #[fuchsia::test]
585 async fn test_buffer_split() {
586 let source = BufferSource::new(1024 * 1024);
587 let allocator = BufferAllocator::new(512, source);
588
589 let _buf = allocator.allocate_buffer(512).await;
592 let mut buf = allocator.allocate_buffer(4096).await;
593 let base = buf.range().start;
594 {
595 let bref = buf.as_mut();
596 let (mut s1, mut s2) = bref.split_at_mut(2048);
597 assert_eq!(s1.len(), 2048);
598 assert_eq!(s1.range(), base..base + 2048);
599 s1.as_mut_slice().fill(0xaa);
600 assert_eq!(s2.len(), 2048);
601 assert_eq!(s2.range(), base + 2048..base + 4096);
602 s2.as_mut_slice().fill(0xbb);
603 }
604 {
605 let bref = buf.as_ref();
606 let (s1, s2) = bref.split_at(1);
607 let (s2, s3) = s2.split_at(2047);
608 let (s3, s4) = s3.split_at(0);
609 assert_eq!(s1.len(), 1);
610 assert_eq!(s1.range(), base..base + 1);
611 assert_eq!(s2.len(), 2047);
612 assert_eq!(s2.range(), base + 1..base + 2048);
613 assert_eq!(s3.len(), 0);
614 assert_eq!(s3.range(), base + 2048..base + 2048);
615 assert_eq!(s4.len(), 2048);
616 assert_eq!(s4.range(), base + 2048..base + 4096);
617 assert_eq!(s1.as_slice(), vec![0xaa; 1]);
618 assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
619 assert_eq!(s3.as_slice(), &[] as &[u8]);
620 assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
621 }
622 }
623
624 #[fuchsia::test]
625 async fn test_blocking_allocation() {
626 let source = BufferSource::new(1024 * 1024);
627 let allocator = Arc::new(BufferAllocator::new(512, source));
628
629 let buf1 = allocator.allocate_buffer(512 * 1024).await;
630 let buf2 = allocator.allocate_buffer(512 * 1024).await;
631 let bufs_dropped = Arc::new(AtomicBool::new(false));
632
633 let allocator_clone = allocator.clone();
635 let bufs_dropped_clone = bufs_dropped.clone();
636 let buf3_fut = async move {
637 allocator_clone.allocate_buffer(1024 * 1024).await;
638 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
639 };
640 pin_mut!(buf3_fut);
641
642 let mut buf_futs = vec![];
644 for _ in 0..16 {
645 let allocator_clone = allocator.clone();
646 let bufs_dropped_clone = bufs_dropped.clone();
647 let fut = async move {
648 allocator_clone.allocate_buffer(64 * 1024).await;
649 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
653 };
654 buf_futs.push(fut);
655 }
656
657 futures::join!(buf3_fut, join_all(buf_futs), async move {
658 std::mem::drop(buf1);
659 std::mem::drop(buf2);
660 bufs_dropped.store(true, Ordering::Relaxed);
661 });
662 }
663
664 #[fuchsia::test]
665 async fn test_clean_entire_transfer_buffer() {
666 const BUFFER_SIZE: usize = 4096;
667 let source = BufferSource::new(BUFFER_SIZE);
668 let allocator = Arc::new(BufferAllocator::new(512, source));
669
670 let mut buf = allocator.allocate_buffer(BUFFER_SIZE).await;
671 buf.as_mut_slice().fill(0xaa);
672 std::mem::drop(buf);
673
674 allocator.clean_transfer_buffer();
675 let buf = allocator.allocate_buffer(BUFFER_SIZE).await;
676 assert_eq!(buf.as_slice(), vec![0; BUFFER_SIZE]);
677 }
678
679 #[fuchsia::test]
680 async fn test_clean_transfer_buffer_around_allocation() {
681 let source = BufferSource::new(4096);
682 let allocator = Arc::new(BufferAllocator::new(512, source));
683
684 let mut buf1 = allocator.allocate_buffer(1024).await;
685 buf1.as_mut_slice().fill(0xaa);
686 let mut buf2 = allocator.allocate_buffer(1024).await;
687 buf2.as_mut_slice().fill(0xbb);
688 assert_eq!(buf2.range().start, 1024);
689 let mut buf3 = allocator.allocate_buffer(2048).await;
690 buf3.as_mut_slice().fill(0xcc);
691 std::mem::drop(buf1);
692 std::mem::drop(buf3);
693
694 allocator.clean_transfer_buffer();
695
696 let buf1 = allocator.allocate_buffer(1024).await;
697 assert_eq!(buf1.as_slice(), vec![0; 1024]);
698
699 assert_eq!(buf2.as_slice(), vec![0xbb; 1024]);
700
701 let buf3 = allocator.allocate_buffer(2048).await;
702 assert_eq!(buf3.as_slice(), vec![0; 2048]);
703 }
704}