1use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16 use fuchsia_runtime::vmar_root_self;
17 use std::ops::Range;
18 use std::sync::Arc;
19 use zx::{self as zx, AsHandleRef};
20
21 #[derive(Debug)]
23 pub struct BufferSource {
24 base: *mut u8,
25 size: usize,
26 vmo: Arc<zx::Vmo>,
27 }
28
29 unsafe impl Send for BufferSource {}
32 unsafe impl Sync for BufferSource {}
33
34 impl BufferSource {
35 pub fn new(size: usize) -> Self {
36 let vmo = Arc::new(zx::Vmo::create(size as u64).unwrap());
37 let name = zx::Name::new("transfer-buf").unwrap();
38 vmo.set_name(&name).unwrap();
39 let flags = zx::VmarFlags::PERM_READ
40 | zx::VmarFlags::PERM_WRITE
41 | zx::VmarFlags::MAP_RANGE
42 | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
43 let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
44 Self { base, size, vmo }
45 }
46
47 pub fn slice(&self) -> *mut [u8] {
48 std::ptr::slice_from_raw_parts_mut(self.base, self.size)
49 }
50
51 pub fn size(&self) -> usize {
52 self.size
53 }
54
55 pub fn vmo(&self) -> &Arc<zx::Vmo> {
56 &self.vmo
57 }
58
59 #[allow(clippy::mut_from_ref)]
60 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
61 assert!(range.start < self.size && range.end <= self.size);
62 unsafe {
63 std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
64 }
65 }
66
67 pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
69 self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
70 }
71 }
72
73 impl Drop for BufferSource {
74 fn drop(&mut self) {
75 unsafe {
77 let _ = vmar_root_self().unmap(self.base as usize, self.size);
78 }
79 }
80 }
81}
82
83#[cfg(not(target_os = "fuchsia"))]
84mod buffer_source {
85 use std::cell::UnsafeCell;
86 use std::ops::Range;
87 use std::pin::Pin;
88
89 #[derive(Debug)]
91 pub struct BufferSource {
92 data: UnsafeCell<Pin<Vec<u8>>>,
97 }
98
99 unsafe impl Sync for BufferSource {}
102
103 impl BufferSource {
104 pub fn new(size: usize) -> Self {
105 Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
106 }
107
108 pub fn size(&self) -> usize {
109 unsafe { (&*self.data.get()).len() }
111 }
112
113 #[allow(clippy::mut_from_ref)]
114 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
115 assert!(range.start < self.size() && range.end <= self.size());
116 unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
117 }
118 }
119}
120
121pub use buffer_source::BufferSource;
122
123type FreeList = Vec<usize>;
127
128#[derive(Debug)]
129struct Inner {
130 free_lists: Vec<FreeList>,
132 allocation_map: BTreeMap<usize, usize>,
134}
135
136#[derive(Debug)]
140pub struct BufferAllocator {
141 block_size: usize,
142 source: BufferSource,
143 inner: Mutex<Inner>,
144 event: Event,
145}
146
147fn order(size: usize, block_size: usize) -> usize {
149 if size <= block_size {
150 return 0;
151 }
152 let nblocks = round_up(size, block_size) / block_size;
153 nblocks.next_power_of_two().trailing_zeros() as usize
154}
155
156fn order_fit(size: usize, block_size: usize) -> usize {
158 assert!(size >= block_size);
159 let nblocks = round_up(size, block_size) / block_size;
160 if nblocks.is_power_of_two() {
161 nblocks.trailing_zeros() as usize
162 } else {
163 nblocks.next_power_of_two().trailing_zeros() as usize - 1
164 }
165}
166
167fn size_for_order(order: usize, block_size: usize) -> usize {
168 block_size * (1 << (order as u32))
169}
170
171fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
172 let size = round_down(size, block_size);
173 assert!(block_size <= size);
174 assert!(block_size.is_power_of_two());
175 let max_order = order_fit(size, block_size);
176 let mut free_lists = Vec::new();
177 for _ in 0..max_order + 1 {
178 free_lists.push(FreeList::new())
179 }
180 let mut offset = 0;
181 while offset < size {
182 let order = order_fit(size - offset, block_size);
183 let size = size_for_order(order, block_size);
184 free_lists[order].push(offset);
185 offset += size;
186 }
187 free_lists
188}
189
190pub struct BufferFuture<'a> {
192 allocator: &'a BufferAllocator,
193 size: usize,
194 listener: Option<EventListener>,
195}
196
197impl<'a> Future for BufferFuture<'a> {
198 type Output = Buffer<'a>;
199
200 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
201 if let Some(listener) = self.listener.as_mut() {
202 futures::ready!(listener.poll_unpin(context));
203 }
204 loop {
207 match self.allocator.try_allocate_buffer(self.size) {
208 Ok(buffer) => return Poll::Ready(buffer),
209 Err(mut listener) => {
210 if listener.poll_unpin(context).is_pending() {
211 self.listener = Some(listener);
212 return Poll::Pending;
213 }
214 }
215 }
216 }
217 }
218}
219
220impl BufferAllocator {
221 pub fn new(block_size: usize, source: BufferSource) -> Self {
222 let free_lists = initial_free_lists(source.size(), block_size);
223 Self {
224 block_size,
225 source,
226 inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
227 event: Event::new(),
228 }
229 }
230
231 pub fn block_size(&self) -> usize {
232 self.block_size
233 }
234
235 pub fn buffer_source(&self) -> &BufferSource {
236 &self.source
237 }
238
239 pub fn take_buffer_source(self) -> BufferSource {
241 self.source
242 }
243
244 pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
252 BufferFuture { allocator: self, size, listener: None }
253 }
254
255 pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
258 if size > self.source.size() {
259 panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
260 }
261 let mut inner = self.inner.lock();
262 let requested_order = order(size, self.block_size());
263 assert!(requested_order < inner.free_lists.len());
264 let mut order = {
266 let mut idx = requested_order;
267 loop {
268 if idx >= inner.free_lists.len() {
269 return Err(self.event.listen());
270 }
271 if !inner.free_lists[idx].is_empty() {
272 break idx;
273 }
274 idx += 1;
275 }
276 };
277
278 let offset = inner.free_lists[order].pop().unwrap();
280 while order > requested_order {
281 order -= 1;
282 assert!(inner.free_lists[order].is_empty());
283 inner.free_lists[order].push(offset + self.size_for_order(order));
284 }
285
286 inner.allocation_map.insert(offset, self.size_for_order(order));
287 let range = offset..offset + size;
288 log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
289
290 Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
292 }
293
294 #[doc(hidden)]
296 pub(super) fn free_buffer(&self, range: Range<usize>) {
297 let mut inner = self.inner.lock();
298 let mut offset = range.start;
299 let size = inner
300 .allocation_map
301 .remove(&offset)
302 .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
303 assert!(range.end - range.start <= size);
304 log::debug!(range:?, bytes_used = size; "Freeing");
305
306 let mut order = order(size, self.block_size());
308 while order < inner.free_lists.len() - 1 {
309 let buddy = self.find_buddy(offset, order);
310 let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
311 idx
312 } else {
313 break;
314 };
315 inner.free_lists[order].remove(idx);
316 offset = std::cmp::min(offset, buddy);
317 order += 1;
318 }
319
320 let idx = inner.free_lists[order]
321 .binary_search(&offset)
322 .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
323 inner.free_lists[order].insert(idx, offset);
324
325 self.event.notify(usize::MAX);
327 }
328
329 fn size_for_order(&self, order: usize) -> usize {
330 size_for_order(order, self.block_size)
331 }
332
333 fn find_buddy(&self, offset: usize, order: usize) -> usize {
334 offset ^ self.size_for_order(order)
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
341 use fuchsia_async as fasync;
342 use futures::future::join_all;
343 use futures::pin_mut;
344 use rand::seq::IndexedRandom;
345 use rand::{Rng, rng};
346 use std::sync::Arc;
347 use std::sync::atomic::{AtomicBool, Ordering};
348
349 #[fuchsia::test]
350 async fn test_odd_sized_buffer_source() {
351 let source = BufferSource::new(123);
352 let allocator = BufferAllocator::new(2, source);
353
354 let sizes = vec![64, 32, 16, 8, 2];
356 let mut bufs = vec![];
357 for size in sizes.iter() {
358 bufs.push(allocator.allocate_buffer(*size).await);
359 }
360 for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
361 assert_eq!(*expected_size, buf.len());
362 }
363 assert!(allocator.try_allocate_buffer(2).is_err());
364 }
365
366 #[fuchsia::test]
367 async fn test_allocate_buffer_read_write() {
368 let source = BufferSource::new(1024 * 1024);
369 let allocator = BufferAllocator::new(8192, source);
370
371 let mut buf = allocator.allocate_buffer(8192).await;
372 buf.as_mut_slice().fill(0xaa as u8);
373 let mut vec = vec![0 as u8; 8192];
374 vec.copy_from_slice(buf.as_slice());
375 assert_eq!(vec, vec![0xaa as u8; 8192]);
376 }
377
378 #[fuchsia::test]
379 async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
380 let source = BufferSource::new(1024 * 1024);
381 let allocator = BufferAllocator::new(8192, source);
382
383 let buf1 = allocator.allocate_buffer(8192).await;
384 let buf2 = allocator.allocate_buffer(8192).await;
385 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
386 }
387
388 #[fuchsia::test]
389 async fn test_allocate_many_buffers() {
390 let source = BufferSource::new(1024 * 1024);
391 let allocator = BufferAllocator::new(8192, source);
392
393 for _ in 0..10 {
394 let _ = allocator.allocate_buffer(8192).await;
395 }
396 }
397
398 #[fuchsia::test]
399 async fn test_allocate_small_buffers_dont_overlap() {
400 let source = BufferSource::new(1024 * 1024);
401 let allocator = BufferAllocator::new(8192, source);
402
403 let buf1 = allocator.allocate_buffer(1).await;
404 let buf2 = allocator.allocate_buffer(1).await;
405 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
406 }
407
408 #[fuchsia::test]
409 async fn test_allocate_large_buffer() {
410 let source = BufferSource::new(1024 * 1024);
411 let allocator = BufferAllocator::new(8192, source);
412
413 let mut buf = allocator.allocate_buffer(1024 * 1024).await;
414 assert_eq!(buf.len(), 1024 * 1024);
415 buf.as_mut_slice().fill(0xaa as u8);
416 let mut vec = vec![0 as u8; 1024 * 1024];
417 vec.copy_from_slice(buf.as_slice());
418 assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
419 }
420
421 #[fuchsia::test]
422 async fn test_allocate_large_buffer_after_smaller_buffers() {
423 let source = BufferSource::new(1024 * 1024);
424 let allocator = BufferAllocator::new(8192, source);
425
426 {
427 let mut buffers = vec![];
428 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
429 buffers.push(buffer);
430 }
431 }
432 let buf = allocator.allocate_buffer(1024 * 1024).await;
433 assert_eq!(buf.len(), 1024 * 1024);
434 }
435
436 #[fuchsia::test]
437 async fn test_allocate_at_limits() {
438 let source = BufferSource::new(1024 * 1024);
439 let allocator = BufferAllocator::new(8192, source);
440
441 let mut buffers = vec![];
442 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
443 buffers.push(buffer);
444 }
445 buffers.pop();
447 let buf = allocator.allocate_buffer(8192).await;
448 assert_eq!(buf.len(), 8192);
449 }
450
451 #[fuchsia::test(threads = 10)]
452 async fn test_random_allocs_deallocs() {
453 let source = BufferSource::new(16 * 1024 * 1024);
454 let bs = 512;
455 let allocator = Arc::new(BufferAllocator::new(bs, source));
456
457 join_all((0..10).map(|_| {
458 let allocator = allocator.clone();
459 fasync::Task::spawn(async move {
460 let mut rng = rng();
461 enum Op {
462 Alloc,
463 Dealloc,
464 }
465 let ops = vec![Op::Alloc, Op::Dealloc];
466 let mut buffers = vec![];
467 for _ in 0..1000 {
468 match ops.choose(&mut rng).unwrap() {
469 Op::Alloc => {
470 let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
475 let size: usize = rng.random_range(
476 bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
477 );
478 if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
479 let val = rng.random::<u8>();
480 buf.as_mut_slice().fill(val);
481 for v in buf.as_slice() {
482 assert_eq!(v, &val);
483 }
484 buffers.push(buf);
485 }
486 }
487 Op::Dealloc if !buffers.is_empty() => {
488 let idx = rng.random_range(0..buffers.len());
489 buffers.remove(idx);
490 }
491 _ => {}
492 };
493 }
494 })
495 }))
496 .await;
497 }
498
499 #[fuchsia::test]
500 async fn test_buffer_refs() {
501 let source = BufferSource::new(1024 * 1024);
502 let allocator = BufferAllocator::new(512, source);
503
504 let _buf = allocator.allocate_buffer(512).await;
507 let mut buf = allocator.allocate_buffer(4096).await;
508 let base = buf.range().start;
509 {
510 let mut bref = buf.subslice_mut(1000..2000);
511 assert_eq!(bref.len(), 1000);
512 assert_eq!(bref.range(), base + 1000..base + 2000);
513 bref.as_mut_slice().fill(0xbb);
514 {
515 let mut bref2 = bref.reborrow().subslice_mut(0..100);
516 assert_eq!(bref2.len(), 100);
517 assert_eq!(bref2.range(), base + 1000..base + 1100);
518 bref2.as_mut_slice().fill(0xaa);
519 }
520 {
521 let mut bref2 = bref.reborrow().subslice_mut(900..1000);
522 assert_eq!(bref2.len(), 100);
523 assert_eq!(bref2.range(), base + 1900..base + 2000);
524 bref2.as_mut_slice().fill(0xcc);
525 }
526 assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
527 assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
528
529 let bref = bref.subslice_mut(900..);
530 assert_eq!(bref.len(), 100);
531 assert_eq!(bref.as_slice(), vec![0xcc; 100]);
532 }
533 {
534 let bref = buf.as_ref();
535 assert_eq!(bref.len(), 4096);
536 assert_eq!(bref.range(), base..base + 4096);
537 assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
538 {
539 let bref2 = bref.subslice(1000..2000);
540 assert_eq!(bref2.len(), 1000);
541 assert_eq!(bref2.range(), base + 1000..base + 2000);
542 assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
543 assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
544 assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
545 }
546
547 let bref = bref.subslice(2048..);
548 assert_eq!(bref.len(), 2048);
549 assert_eq!(bref.as_slice(), vec![0x00; 2048]);
550 }
551 }
552
553 #[fuchsia::test]
554 async fn test_buffer_split() {
555 let source = BufferSource::new(1024 * 1024);
556 let allocator = BufferAllocator::new(512, source);
557
558 let _buf = allocator.allocate_buffer(512).await;
561 let mut buf = allocator.allocate_buffer(4096).await;
562 let base = buf.range().start;
563 {
564 let bref = buf.as_mut();
565 let (mut s1, mut s2) = bref.split_at_mut(2048);
566 assert_eq!(s1.len(), 2048);
567 assert_eq!(s1.range(), base..base + 2048);
568 s1.as_mut_slice().fill(0xaa);
569 assert_eq!(s2.len(), 2048);
570 assert_eq!(s2.range(), base + 2048..base + 4096);
571 s2.as_mut_slice().fill(0xbb);
572 }
573 {
574 let bref = buf.as_ref();
575 let (s1, s2) = bref.split_at(1);
576 let (s2, s3) = s2.split_at(2047);
577 let (s3, s4) = s3.split_at(0);
578 assert_eq!(s1.len(), 1);
579 assert_eq!(s1.range(), base..base + 1);
580 assert_eq!(s2.len(), 2047);
581 assert_eq!(s2.range(), base + 1..base + 2048);
582 assert_eq!(s3.len(), 0);
583 assert_eq!(s3.range(), base + 2048..base + 2048);
584 assert_eq!(s4.len(), 2048);
585 assert_eq!(s4.range(), base + 2048..base + 4096);
586 assert_eq!(s1.as_slice(), vec![0xaa; 1]);
587 assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
588 assert_eq!(s3.as_slice(), &[] as &[u8]);
589 assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
590 }
591 }
592
593 #[fuchsia::test]
594 async fn test_blocking_allocation() {
595 let source = BufferSource::new(1024 * 1024);
596 let allocator = Arc::new(BufferAllocator::new(512, source));
597
598 let buf1 = allocator.allocate_buffer(512 * 1024).await;
599 let buf2 = allocator.allocate_buffer(512 * 1024).await;
600 let bufs_dropped = Arc::new(AtomicBool::new(false));
601
602 let allocator_clone = allocator.clone();
604 let bufs_dropped_clone = bufs_dropped.clone();
605 let buf3_fut = async move {
606 allocator_clone.allocate_buffer(1024 * 1024).await;
607 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
608 };
609 pin_mut!(buf3_fut);
610
611 let mut buf_futs = vec![];
613 for _ in 0..16 {
614 let allocator_clone = allocator.clone();
615 let bufs_dropped_clone = bufs_dropped.clone();
616 let fut = async move {
617 allocator_clone.allocate_buffer(64 * 1024).await;
618 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
622 };
623 buf_futs.push(fut);
624 }
625
626 futures::join!(buf3_fut, join_all(buf_futs), async move {
627 std::mem::drop(buf1);
628 std::mem::drop(buf2);
629 bufs_dropped.store(true, Ordering::Relaxed);
630 });
631 }
632}