1use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16 use fuchsia_runtime::vmar_root_self;
17 use std::ops::Range;
18 use std::sync::Arc;
19
20 #[derive(Debug)]
22 pub struct BufferSource {
23 base: *mut u8,
24 size: usize,
25 vmo: Arc<zx::Vmo>,
26 }
27
28 unsafe impl Send for BufferSource {}
31 unsafe impl Sync for BufferSource {}
32
33 impl BufferSource {
34 pub fn new(size: usize) -> Self {
35 let vmo = Arc::new(zx::Vmo::create(size as u64).unwrap());
36 let name = zx::Name::new("transfer-buf").unwrap();
37 vmo.set_name(&name).unwrap();
38 let flags = zx::VmarFlags::PERM_READ
39 | zx::VmarFlags::PERM_WRITE
40 | zx::VmarFlags::MAP_RANGE
41 | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42 let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43 Self { base, size, vmo }
44 }
45
46 pub fn slice(&self) -> *mut [u8] {
47 std::ptr::slice_from_raw_parts_mut(self.base, self.size)
48 }
49
50 pub fn size(&self) -> usize {
51 self.size
52 }
53
54 pub fn vmo(&self) -> &Arc<zx::Vmo> {
55 &self.vmo
56 }
57
58 #[allow(clippy::mut_from_ref)]
59 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
60 assert!(range.start < self.size && range.end <= self.size);
61 unsafe {
62 std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
63 }
64 }
65
66 pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
68 self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
69 }
70 }
71
72 impl Drop for BufferSource {
73 fn drop(&mut self) {
74 unsafe {
76 let _ = vmar_root_self().unmap(self.base as usize, self.size);
77 }
78 }
79 }
80}
81
82#[cfg(not(target_os = "fuchsia"))]
83mod buffer_source {
84 use std::cell::UnsafeCell;
85 use std::ops::Range;
86 use std::pin::Pin;
87
88 #[derive(Debug)]
90 pub struct BufferSource {
91 data: UnsafeCell<Pin<Vec<u8>>>,
96 }
97
98 unsafe impl Sync for BufferSource {}
101
102 impl BufferSource {
103 pub fn new(size: usize) -> Self {
104 Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
105 }
106
107 pub fn size(&self) -> usize {
108 unsafe { (&*self.data.get()).len() }
110 }
111
112 #[allow(clippy::mut_from_ref)]
113 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
114 assert!(range.start < self.size() && range.end <= self.size());
115 unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
116 }
117 }
118}
119
120pub use buffer_source::BufferSource;
121
122type FreeList = Vec<usize>;
126
127#[derive(Debug)]
128struct Inner {
129 free_lists: Vec<FreeList>,
131 allocation_map: BTreeMap<usize, usize>,
133}
134
135#[derive(Debug)]
139pub struct BufferAllocator {
140 block_size: usize,
141 source: BufferSource,
142 inner: Mutex<Inner>,
143 event: Event,
144}
145
146fn order(size: usize, block_size: usize) -> usize {
148 if size <= block_size {
149 return 0;
150 }
151 let nblocks = round_up(size, block_size) / block_size;
152 nblocks.next_power_of_two().trailing_zeros() as usize
153}
154
155fn order_fit(size: usize, block_size: usize) -> usize {
157 assert!(size >= block_size);
158 let nblocks = round_up(size, block_size) / block_size;
159 if nblocks.is_power_of_two() {
160 nblocks.trailing_zeros() as usize
161 } else {
162 nblocks.next_power_of_two().trailing_zeros() as usize - 1
163 }
164}
165
166fn size_for_order(order: usize, block_size: usize) -> usize {
167 block_size * (1 << (order as u32))
168}
169
170fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
171 let size = round_down(size, block_size);
172 assert!(block_size <= size);
173 assert!(block_size.is_power_of_two());
174 let max_order = order_fit(size, block_size);
175 let mut free_lists = Vec::new();
176 for _ in 0..max_order + 1 {
177 free_lists.push(FreeList::new())
178 }
179 let mut offset = 0;
180 while offset < size {
181 let order = order_fit(size - offset, block_size);
182 let size = size_for_order(order, block_size);
183 free_lists[order].push(offset);
184 offset += size;
185 }
186 free_lists
187}
188
189pub struct BufferFuture<'a> {
191 allocator: &'a BufferAllocator,
192 size: usize,
193 listener: Option<EventListener>,
194}
195
196impl<'a> Future for BufferFuture<'a> {
197 type Output = Buffer<'a>;
198
199 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
200 if let Some(listener) = self.listener.as_mut() {
201 futures::ready!(listener.poll_unpin(context));
202 }
203 loop {
206 match self.allocator.try_allocate_buffer(self.size) {
207 Ok(buffer) => return Poll::Ready(buffer),
208 Err(mut listener) => {
209 if listener.poll_unpin(context).is_pending() {
210 self.listener = Some(listener);
211 return Poll::Pending;
212 }
213 }
214 }
215 }
216 }
217}
218
219impl BufferAllocator {
220 pub fn new(block_size: usize, source: BufferSource) -> Self {
221 let free_lists = initial_free_lists(source.size(), block_size);
222 Self {
223 block_size,
224 source,
225 inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
226 event: Event::new(),
227 }
228 }
229
230 pub fn block_size(&self) -> usize {
231 self.block_size
232 }
233
234 pub fn buffer_source(&self) -> &BufferSource {
235 &self.source
236 }
237
238 pub fn take_buffer_source(self) -> BufferSource {
240 self.source
241 }
242
243 pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
251 BufferFuture { allocator: self, size, listener: None }
252 }
253
254 pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
257 if size > self.source.size() {
258 panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
259 }
260 let mut inner = self.inner.lock();
261 let requested_order = order(size, self.block_size());
262 assert!(requested_order < inner.free_lists.len());
263 let mut order = {
265 let mut idx = requested_order;
266 loop {
267 if idx >= inner.free_lists.len() {
268 return Err(self.event.listen());
269 }
270 if !inner.free_lists[idx].is_empty() {
271 break idx;
272 }
273 idx += 1;
274 }
275 };
276
277 let offset = inner.free_lists[order].pop().unwrap();
279 while order > requested_order {
280 order -= 1;
281 assert!(inner.free_lists[order].is_empty());
282 inner.free_lists[order].push(offset + self.size_for_order(order));
283 }
284
285 inner.allocation_map.insert(offset, self.size_for_order(order));
286 let range = offset..offset + size;
287 log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
288
289 Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
291 }
292
293 #[doc(hidden)]
295 pub(super) fn free_buffer(&self, range: Range<usize>) {
296 let mut inner = self.inner.lock();
297 let mut offset = range.start;
298 let size = inner
299 .allocation_map
300 .remove(&offset)
301 .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
302 assert!(range.end - range.start <= size);
303 log::debug!(range:?, bytes_used = size; "Freeing");
304
305 let mut order = order(size, self.block_size());
307 while order < inner.free_lists.len() - 1 {
308 let buddy = self.find_buddy(offset, order);
309 let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
310 idx
311 } else {
312 break;
313 };
314 inner.free_lists[order].remove(idx);
315 offset = std::cmp::min(offset, buddy);
316 order += 1;
317 }
318
319 let idx = inner.free_lists[order]
320 .binary_search(&offset)
321 .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
322 inner.free_lists[order].insert(idx, offset);
323
324 self.event.notify(usize::MAX);
326 }
327
328 fn size_for_order(&self, order: usize) -> usize {
329 size_for_order(order, self.block_size)
330 }
331
332 fn find_buddy(&self, offset: usize, order: usize) -> usize {
333 offset ^ self.size_for_order(order)
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
340 use fuchsia_async as fasync;
341 use futures::future::join_all;
342 use futures::pin_mut;
343 use rand::seq::IndexedRandom;
344 use rand::{Rng, rng};
345 use std::sync::Arc;
346 use std::sync::atomic::{AtomicBool, Ordering};
347
348 #[fuchsia::test]
349 async fn test_odd_sized_buffer_source() {
350 let source = BufferSource::new(123);
351 let allocator = BufferAllocator::new(2, source);
352
353 let sizes = vec![64, 32, 16, 8, 2];
355 let mut bufs = vec![];
356 for size in sizes.iter() {
357 bufs.push(allocator.allocate_buffer(*size).await);
358 }
359 for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
360 assert_eq!(*expected_size, buf.len());
361 }
362 assert!(allocator.try_allocate_buffer(2).is_err());
363 }
364
365 #[fuchsia::test]
366 async fn test_allocate_buffer_read_write() {
367 let source = BufferSource::new(1024 * 1024);
368 let allocator = BufferAllocator::new(8192, source);
369
370 let mut buf = allocator.allocate_buffer(8192).await;
371 buf.as_mut_slice().fill(0xaa as u8);
372 let mut vec = vec![0 as u8; 8192];
373 vec.copy_from_slice(buf.as_slice());
374 assert_eq!(vec, vec![0xaa as u8; 8192]);
375 }
376
377 #[fuchsia::test]
378 async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
379 let source = BufferSource::new(1024 * 1024);
380 let allocator = BufferAllocator::new(8192, source);
381
382 let buf1 = allocator.allocate_buffer(8192).await;
383 let buf2 = allocator.allocate_buffer(8192).await;
384 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
385 }
386
387 #[fuchsia::test]
388 async fn test_allocate_many_buffers() {
389 let source = BufferSource::new(1024 * 1024);
390 let allocator = BufferAllocator::new(8192, source);
391
392 for _ in 0..10 {
393 let _ = allocator.allocate_buffer(8192).await;
394 }
395 }
396
397 #[fuchsia::test]
398 async fn test_allocate_small_buffers_dont_overlap() {
399 let source = BufferSource::new(1024 * 1024);
400 let allocator = BufferAllocator::new(8192, source);
401
402 let buf1 = allocator.allocate_buffer(1).await;
403 let buf2 = allocator.allocate_buffer(1).await;
404 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
405 }
406
407 #[fuchsia::test]
408 async fn test_allocate_large_buffer() {
409 let source = BufferSource::new(1024 * 1024);
410 let allocator = BufferAllocator::new(8192, source);
411
412 let mut buf = allocator.allocate_buffer(1024 * 1024).await;
413 assert_eq!(buf.len(), 1024 * 1024);
414 buf.as_mut_slice().fill(0xaa as u8);
415 let mut vec = vec![0 as u8; 1024 * 1024];
416 vec.copy_from_slice(buf.as_slice());
417 assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
418 }
419
420 #[fuchsia::test]
421 async fn test_allocate_large_buffer_after_smaller_buffers() {
422 let source = BufferSource::new(1024 * 1024);
423 let allocator = BufferAllocator::new(8192, source);
424
425 {
426 let mut buffers = vec![];
427 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
428 buffers.push(buffer);
429 }
430 }
431 let buf = allocator.allocate_buffer(1024 * 1024).await;
432 assert_eq!(buf.len(), 1024 * 1024);
433 }
434
435 #[fuchsia::test]
436 async fn test_allocate_at_limits() {
437 let source = BufferSource::new(1024 * 1024);
438 let allocator = BufferAllocator::new(8192, source);
439
440 let mut buffers = vec![];
441 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
442 buffers.push(buffer);
443 }
444 buffers.pop();
446 let buf = allocator.allocate_buffer(8192).await;
447 assert_eq!(buf.len(), 8192);
448 }
449
450 #[fuchsia::test(threads = 10)]
451 async fn test_random_allocs_deallocs() {
452 let source = BufferSource::new(16 * 1024 * 1024);
453 let bs = 512;
454 let allocator = Arc::new(BufferAllocator::new(bs, source));
455
456 join_all((0..10).map(|_| {
457 let allocator = allocator.clone();
458 fasync::Task::spawn(async move {
459 let mut rng = rng();
460 enum Op {
461 Alloc,
462 Dealloc,
463 }
464 let ops = vec![Op::Alloc, Op::Dealloc];
465 let mut buffers = vec![];
466 for _ in 0..1000 {
467 match ops.choose(&mut rng).unwrap() {
468 Op::Alloc => {
469 let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
474 let size: usize = rng.random_range(
475 bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
476 );
477 if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
478 let val = rng.random::<u8>();
479 buf.as_mut_slice().fill(val);
480 for v in buf.as_slice() {
481 assert_eq!(v, &val);
482 }
483 buffers.push(buf);
484 }
485 }
486 Op::Dealloc if !buffers.is_empty() => {
487 let idx = rng.random_range(0..buffers.len());
488 buffers.remove(idx);
489 }
490 _ => {}
491 };
492 }
493 })
494 }))
495 .await;
496 }
497
498 #[fuchsia::test]
499 async fn test_buffer_refs() {
500 let source = BufferSource::new(1024 * 1024);
501 let allocator = BufferAllocator::new(512, source);
502
503 let _buf = allocator.allocate_buffer(512).await;
506 let mut buf = allocator.allocate_buffer(4096).await;
507 let base = buf.range().start;
508 {
509 let mut bref = buf.subslice_mut(1000..2000);
510 assert_eq!(bref.len(), 1000);
511 assert_eq!(bref.range(), base + 1000..base + 2000);
512 bref.as_mut_slice().fill(0xbb);
513 {
514 let mut bref2 = bref.reborrow().subslice_mut(0..100);
515 assert_eq!(bref2.len(), 100);
516 assert_eq!(bref2.range(), base + 1000..base + 1100);
517 bref2.as_mut_slice().fill(0xaa);
518 }
519 {
520 let mut bref2 = bref.reborrow().subslice_mut(900..1000);
521 assert_eq!(bref2.len(), 100);
522 assert_eq!(bref2.range(), base + 1900..base + 2000);
523 bref2.as_mut_slice().fill(0xcc);
524 }
525 assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
526 assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
527
528 let bref = bref.subslice_mut(900..);
529 assert_eq!(bref.len(), 100);
530 assert_eq!(bref.as_slice(), vec![0xcc; 100]);
531 }
532 {
533 let bref = buf.as_ref();
534 assert_eq!(bref.len(), 4096);
535 assert_eq!(bref.range(), base..base + 4096);
536 assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
537 {
538 let bref2 = bref.subslice(1000..2000);
539 assert_eq!(bref2.len(), 1000);
540 assert_eq!(bref2.range(), base + 1000..base + 2000);
541 assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
542 assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
543 assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
544 }
545
546 let bref = bref.subslice(2048..);
547 assert_eq!(bref.len(), 2048);
548 assert_eq!(bref.as_slice(), vec![0x00; 2048]);
549 }
550 }
551
552 #[fuchsia::test]
553 async fn test_buffer_split() {
554 let source = BufferSource::new(1024 * 1024);
555 let allocator = BufferAllocator::new(512, source);
556
557 let _buf = allocator.allocate_buffer(512).await;
560 let mut buf = allocator.allocate_buffer(4096).await;
561 let base = buf.range().start;
562 {
563 let bref = buf.as_mut();
564 let (mut s1, mut s2) = bref.split_at_mut(2048);
565 assert_eq!(s1.len(), 2048);
566 assert_eq!(s1.range(), base..base + 2048);
567 s1.as_mut_slice().fill(0xaa);
568 assert_eq!(s2.len(), 2048);
569 assert_eq!(s2.range(), base + 2048..base + 4096);
570 s2.as_mut_slice().fill(0xbb);
571 }
572 {
573 let bref = buf.as_ref();
574 let (s1, s2) = bref.split_at(1);
575 let (s2, s3) = s2.split_at(2047);
576 let (s3, s4) = s3.split_at(0);
577 assert_eq!(s1.len(), 1);
578 assert_eq!(s1.range(), base..base + 1);
579 assert_eq!(s2.len(), 2047);
580 assert_eq!(s2.range(), base + 1..base + 2048);
581 assert_eq!(s3.len(), 0);
582 assert_eq!(s3.range(), base + 2048..base + 2048);
583 assert_eq!(s4.len(), 2048);
584 assert_eq!(s4.range(), base + 2048..base + 4096);
585 assert_eq!(s1.as_slice(), vec![0xaa; 1]);
586 assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
587 assert_eq!(s3.as_slice(), &[] as &[u8]);
588 assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
589 }
590 }
591
592 #[fuchsia::test]
593 async fn test_blocking_allocation() {
594 let source = BufferSource::new(1024 * 1024);
595 let allocator = Arc::new(BufferAllocator::new(512, source));
596
597 let buf1 = allocator.allocate_buffer(512 * 1024).await;
598 let buf2 = allocator.allocate_buffer(512 * 1024).await;
599 let bufs_dropped = Arc::new(AtomicBool::new(false));
600
601 let allocator_clone = allocator.clone();
603 let bufs_dropped_clone = bufs_dropped.clone();
604 let buf3_fut = async move {
605 allocator_clone.allocate_buffer(1024 * 1024).await;
606 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
607 };
608 pin_mut!(buf3_fut);
609
610 let mut buf_futs = vec![];
612 for _ in 0..16 {
613 let allocator_clone = allocator.clone();
614 let bufs_dropped_clone = bufs_dropped.clone();
615 let fut = async move {
616 allocator_clone.allocate_buffer(64 * 1024).await;
617 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
621 };
622 buf_futs.push(fut);
623 }
624
625 futures::join!(buf3_fut, join_all(buf_futs), async move {
626 std::mem::drop(buf1);
627 std::mem::drop(buf2);
628 bufs_dropped.store(true, Ordering::Relaxed);
629 });
630 }
631}