1use crate::buffer::{Buffer, round_down, round_up};
6use event_listener::{Event, EventListener};
7use fuchsia_sync::Mutex;
8use futures::{Future, FutureExt as _};
9use std::collections::BTreeMap;
10use std::ops::Range;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16 use fuchsia_runtime::vmar_root_self;
17 use std::ops::Range;
18 use zx::{self as zx, AsHandleRef};
19
20 #[derive(Debug)]
22 pub struct BufferSource {
23 base: *mut u8,
24 size: usize,
25 vmo: zx::Vmo,
26 }
27
28 unsafe impl Send for BufferSource {}
31 unsafe impl Sync for BufferSource {}
32
33 impl BufferSource {
34 pub fn new(size: usize) -> Self {
35 let vmo = zx::Vmo::create(size as u64).unwrap();
36 let name = zx::Name::new("transfer-buf").unwrap();
37 vmo.set_name(&name).unwrap();
38 let flags = zx::VmarFlags::PERM_READ
39 | zx::VmarFlags::PERM_WRITE
40 | zx::VmarFlags::MAP_RANGE
41 | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42 let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43 Self { base, size, vmo }
44 }
45
46 pub fn size(&self) -> usize {
47 self.size
48 }
49
50 pub fn vmo(&self) -> &zx::Vmo {
51 &self.vmo
52 }
53
54 #[allow(clippy::mut_from_ref)]
55 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
56 assert!(range.start < self.size && range.end <= self.size);
57 unsafe {
58 std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
59 }
60 }
61
62 pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
64 self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
65 }
66 }
67
68 impl Drop for BufferSource {
69 fn drop(&mut self) {
70 unsafe {
72 let _ = vmar_root_self().unmap(self.base as usize, self.size);
73 }
74 }
75 }
76}
77
78#[cfg(not(target_os = "fuchsia"))]
79mod buffer_source {
80 use std::cell::UnsafeCell;
81 use std::ops::Range;
82 use std::pin::Pin;
83
84 #[derive(Debug)]
86 pub struct BufferSource {
87 data: UnsafeCell<Pin<Vec<u8>>>,
92 }
93
94 unsafe impl Sync for BufferSource {}
97
98 impl BufferSource {
99 pub fn new(size: usize) -> Self {
100 Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
101 }
102
103 pub fn size(&self) -> usize {
104 unsafe { (&*self.data.get()).len() }
106 }
107
108 #[allow(clippy::mut_from_ref)]
109 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
110 assert!(range.start < self.size() && range.end <= self.size());
111 unsafe { &mut (&mut *self.data.get())[range.start..range.end] }
112 }
113 }
114}
115
116pub use buffer_source::BufferSource;
117
118type FreeList = Vec<usize>;
122
123#[derive(Debug)]
124struct Inner {
125 free_lists: Vec<FreeList>,
127 allocation_map: BTreeMap<usize, usize>,
129}
130
131#[derive(Debug)]
135pub struct BufferAllocator {
136 block_size: usize,
137 source: BufferSource,
138 inner: Mutex<Inner>,
139 event: Event,
140}
141
142fn order(size: usize, block_size: usize) -> usize {
144 if size <= block_size {
145 return 0;
146 }
147 let nblocks = round_up(size, block_size) / block_size;
148 nblocks.next_power_of_two().trailing_zeros() as usize
149}
150
151fn order_fit(size: usize, block_size: usize) -> usize {
153 assert!(size >= block_size);
154 let nblocks = round_up(size, block_size) / block_size;
155 if nblocks.is_power_of_two() {
156 nblocks.trailing_zeros() as usize
157 } else {
158 nblocks.next_power_of_two().trailing_zeros() as usize - 1
159 }
160}
161
162fn size_for_order(order: usize, block_size: usize) -> usize {
163 block_size * (1 << (order as u32))
164}
165
166fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
167 let size = round_down(size, block_size);
168 assert!(block_size <= size);
169 assert!(block_size.is_power_of_two());
170 let max_order = order_fit(size, block_size);
171 let mut free_lists = Vec::new();
172 for _ in 0..max_order + 1 {
173 free_lists.push(FreeList::new())
174 }
175 let mut offset = 0;
176 while offset < size {
177 let order = order_fit(size - offset, block_size);
178 let size = size_for_order(order, block_size);
179 free_lists[order].push(offset);
180 offset += size;
181 }
182 free_lists
183}
184
185pub struct BufferFuture<'a> {
187 allocator: &'a BufferAllocator,
188 size: usize,
189 listener: Option<EventListener>,
190}
191
192impl<'a> Future for BufferFuture<'a> {
193 type Output = Buffer<'a>;
194
195 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
196 if let Some(listener) = self.listener.as_mut() {
197 futures::ready!(listener.poll_unpin(context));
198 }
199 loop {
202 match self.allocator.try_allocate_buffer(self.size) {
203 Ok(buffer) => return Poll::Ready(buffer),
204 Err(mut listener) => {
205 if listener.poll_unpin(context).is_pending() {
206 self.listener = Some(listener);
207 return Poll::Pending;
208 }
209 }
210 }
211 }
212 }
213}
214
215impl BufferAllocator {
216 pub fn new(block_size: usize, source: BufferSource) -> Self {
217 let free_lists = initial_free_lists(source.size(), block_size);
218 Self {
219 block_size,
220 source,
221 inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
222 event: Event::new(),
223 }
224 }
225
226 pub fn block_size(&self) -> usize {
227 self.block_size
228 }
229
230 pub fn buffer_source(&self) -> &BufferSource {
231 &self.source
232 }
233
234 pub fn take_buffer_source(self) -> BufferSource {
236 self.source
237 }
238
239 pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
247 BufferFuture { allocator: self, size, listener: None }
248 }
249
250 pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
253 if size > self.source.size() {
254 panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
255 }
256 let mut inner = self.inner.lock();
257 let requested_order = order(size, self.block_size());
258 assert!(requested_order < inner.free_lists.len());
259 let mut order = {
261 let mut idx = requested_order;
262 loop {
263 if idx >= inner.free_lists.len() {
264 return Err(self.event.listen());
265 }
266 if !inner.free_lists[idx].is_empty() {
267 break idx;
268 }
269 idx += 1;
270 }
271 };
272
273 let offset = inner.free_lists[order].pop().unwrap();
275 while order > requested_order {
276 order -= 1;
277 assert!(inner.free_lists[order].is_empty());
278 inner.free_lists[order].push(offset + self.size_for_order(order));
279 }
280
281 inner.allocation_map.insert(offset, self.size_for_order(order));
282 let range = offset..offset + size;
283 log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
284
285 Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
287 }
288
289 #[doc(hidden)]
291 pub(super) fn free_buffer(&self, range: Range<usize>) {
292 let mut inner = self.inner.lock();
293 let mut offset = range.start;
294 let size = inner
295 .allocation_map
296 .remove(&offset)
297 .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
298 assert!(range.end - range.start <= size);
299 log::debug!(range:?, bytes_used = size; "Freeing");
300
301 let mut order = order(size, self.block_size());
303 while order < inner.free_lists.len() - 1 {
304 let buddy = self.find_buddy(offset, order);
305 let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
306 idx
307 } else {
308 break;
309 };
310 inner.free_lists[order].remove(idx);
311 offset = std::cmp::min(offset, buddy);
312 order += 1;
313 }
314
315 let idx = inner.free_lists[order]
316 .binary_search(&offset)
317 .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
318 inner.free_lists[order].insert(idx, offset);
319
320 self.event.notify(usize::MAX);
322 }
323
324 fn size_for_order(&self, order: usize) -> usize {
325 size_for_order(order, self.block_size)
326 }
327
328 fn find_buddy(&self, offset: usize, order: usize) -> usize {
329 offset ^ self.size_for_order(order)
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use crate::buffer_allocator::{BufferAllocator, BufferSource, order};
336 use fuchsia_async as fasync;
337 use futures::future::join_all;
338 use futures::pin_mut;
339 use rand::seq::IndexedRandom;
340 use rand::{Rng, rng};
341 use std::sync::Arc;
342 use std::sync::atomic::{AtomicBool, Ordering};
343
344 #[fuchsia::test]
345 async fn test_odd_sized_buffer_source() {
346 let source = BufferSource::new(123);
347 let allocator = BufferAllocator::new(2, source);
348
349 let sizes = vec![64, 32, 16, 8, 2];
351 let mut bufs = vec![];
352 for size in sizes.iter() {
353 bufs.push(allocator.allocate_buffer(*size).await);
354 }
355 for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
356 assert_eq!(*expected_size, buf.len());
357 }
358 assert!(allocator.try_allocate_buffer(2).is_err());
359 }
360
361 #[fuchsia::test]
362 async fn test_allocate_buffer_read_write() {
363 let source = BufferSource::new(1024 * 1024);
364 let allocator = BufferAllocator::new(8192, source);
365
366 let mut buf = allocator.allocate_buffer(8192).await;
367 buf.as_mut_slice().fill(0xaa as u8);
368 let mut vec = vec![0 as u8; 8192];
369 vec.copy_from_slice(buf.as_slice());
370 assert_eq!(vec, vec![0xaa as u8; 8192]);
371 }
372
373 #[fuchsia::test]
374 async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
375 let source = BufferSource::new(1024 * 1024);
376 let allocator = BufferAllocator::new(8192, source);
377
378 let buf1 = allocator.allocate_buffer(8192).await;
379 let buf2 = allocator.allocate_buffer(8192).await;
380 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
381 }
382
383 #[fuchsia::test]
384 async fn test_allocate_many_buffers() {
385 let source = BufferSource::new(1024 * 1024);
386 let allocator = BufferAllocator::new(8192, source);
387
388 for _ in 0..10 {
389 let _ = allocator.allocate_buffer(8192).await;
390 }
391 }
392
393 #[fuchsia::test]
394 async fn test_allocate_small_buffers_dont_overlap() {
395 let source = BufferSource::new(1024 * 1024);
396 let allocator = BufferAllocator::new(8192, source);
397
398 let buf1 = allocator.allocate_buffer(1).await;
399 let buf2 = allocator.allocate_buffer(1).await;
400 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
401 }
402
403 #[fuchsia::test]
404 async fn test_allocate_large_buffer() {
405 let source = BufferSource::new(1024 * 1024);
406 let allocator = BufferAllocator::new(8192, source);
407
408 let mut buf = allocator.allocate_buffer(1024 * 1024).await;
409 assert_eq!(buf.len(), 1024 * 1024);
410 buf.as_mut_slice().fill(0xaa as u8);
411 let mut vec = vec![0 as u8; 1024 * 1024];
412 vec.copy_from_slice(buf.as_slice());
413 assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
414 }
415
416 #[fuchsia::test]
417 async fn test_allocate_large_buffer_after_smaller_buffers() {
418 let source = BufferSource::new(1024 * 1024);
419 let allocator = BufferAllocator::new(8192, source);
420
421 {
422 let mut buffers = vec![];
423 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
424 buffers.push(buffer);
425 }
426 }
427 let buf = allocator.allocate_buffer(1024 * 1024).await;
428 assert_eq!(buf.len(), 1024 * 1024);
429 }
430
431 #[fuchsia::test]
432 async fn test_allocate_at_limits() {
433 let source = BufferSource::new(1024 * 1024);
434 let allocator = BufferAllocator::new(8192, source);
435
436 let mut buffers = vec![];
437 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
438 buffers.push(buffer);
439 }
440 buffers.pop();
442 let buf = allocator.allocate_buffer(8192).await;
443 assert_eq!(buf.len(), 8192);
444 }
445
446 #[fuchsia::test(threads = 10)]
447 async fn test_random_allocs_deallocs() {
448 let source = BufferSource::new(16 * 1024 * 1024);
449 let bs = 512;
450 let allocator = Arc::new(BufferAllocator::new(bs, source));
451
452 join_all((0..10).map(|_| {
453 let allocator = allocator.clone();
454 fasync::Task::spawn(async move {
455 let mut rng = rng();
456 enum Op {
457 Alloc,
458 Dealloc,
459 }
460 let ops = vec![Op::Alloc, Op::Dealloc];
461 let mut buffers = vec![];
462 for _ in 0..1000 {
463 match ops.choose(&mut rng).unwrap() {
464 Op::Alloc => {
465 let order: usize = rng.random_range(order(1, bs)..order(65536 + 1, bs));
470 let size: usize = rng.random_range(
471 bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
472 );
473 if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
474 let val = rng.random::<u8>();
475 buf.as_mut_slice().fill(val);
476 for v in buf.as_slice() {
477 assert_eq!(v, &val);
478 }
479 buffers.push(buf);
480 }
481 }
482 Op::Dealloc if !buffers.is_empty() => {
483 let idx = rng.random_range(0..buffers.len());
484 buffers.remove(idx);
485 }
486 _ => {}
487 };
488 }
489 })
490 }))
491 .await;
492 }
493
494 #[fuchsia::test]
495 async fn test_buffer_refs() {
496 let source = BufferSource::new(1024 * 1024);
497 let allocator = BufferAllocator::new(512, source);
498
499 let _buf = allocator.allocate_buffer(512).await;
502 let mut buf = allocator.allocate_buffer(4096).await;
503 let base = buf.range().start;
504 {
505 let mut bref = buf.subslice_mut(1000..2000);
506 assert_eq!(bref.len(), 1000);
507 assert_eq!(bref.range(), base + 1000..base + 2000);
508 bref.as_mut_slice().fill(0xbb);
509 {
510 let mut bref2 = bref.reborrow().subslice_mut(0..100);
511 assert_eq!(bref2.len(), 100);
512 assert_eq!(bref2.range(), base + 1000..base + 1100);
513 bref2.as_mut_slice().fill(0xaa);
514 }
515 {
516 let mut bref2 = bref.reborrow().subslice_mut(900..1000);
517 assert_eq!(bref2.len(), 100);
518 assert_eq!(bref2.range(), base + 1900..base + 2000);
519 bref2.as_mut_slice().fill(0xcc);
520 }
521 assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
522 assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
523
524 let bref = bref.subslice_mut(900..);
525 assert_eq!(bref.len(), 100);
526 assert_eq!(bref.as_slice(), vec![0xcc; 100]);
527 }
528 {
529 let bref = buf.as_ref();
530 assert_eq!(bref.len(), 4096);
531 assert_eq!(bref.range(), base..base + 4096);
532 assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
533 {
534 let bref2 = bref.subslice(1000..2000);
535 assert_eq!(bref2.len(), 1000);
536 assert_eq!(bref2.range(), base + 1000..base + 2000);
537 assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
538 assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
539 assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
540 }
541
542 let bref = bref.subslice(2048..);
543 assert_eq!(bref.len(), 2048);
544 assert_eq!(bref.as_slice(), vec![0x00; 2048]);
545 }
546 }
547
548 #[fuchsia::test]
549 async fn test_buffer_split() {
550 let source = BufferSource::new(1024 * 1024);
551 let allocator = BufferAllocator::new(512, source);
552
553 let _buf = allocator.allocate_buffer(512).await;
556 let mut buf = allocator.allocate_buffer(4096).await;
557 let base = buf.range().start;
558 {
559 let bref = buf.as_mut();
560 let (mut s1, mut s2) = bref.split_at_mut(2048);
561 assert_eq!(s1.len(), 2048);
562 assert_eq!(s1.range(), base..base + 2048);
563 s1.as_mut_slice().fill(0xaa);
564 assert_eq!(s2.len(), 2048);
565 assert_eq!(s2.range(), base + 2048..base + 4096);
566 s2.as_mut_slice().fill(0xbb);
567 }
568 {
569 let bref = buf.as_ref();
570 let (s1, s2) = bref.split_at(1);
571 let (s2, s3) = s2.split_at(2047);
572 let (s3, s4) = s3.split_at(0);
573 assert_eq!(s1.len(), 1);
574 assert_eq!(s1.range(), base..base + 1);
575 assert_eq!(s2.len(), 2047);
576 assert_eq!(s2.range(), base + 1..base + 2048);
577 assert_eq!(s3.len(), 0);
578 assert_eq!(s3.range(), base + 2048..base + 2048);
579 assert_eq!(s4.len(), 2048);
580 assert_eq!(s4.range(), base + 2048..base + 4096);
581 assert_eq!(s1.as_slice(), vec![0xaa; 1]);
582 assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
583 assert_eq!(s3.as_slice(), &[] as &[u8]);
584 assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
585 }
586 }
587
588 #[fuchsia::test]
589 async fn test_blocking_allocation() {
590 let source = BufferSource::new(1024 * 1024);
591 let allocator = Arc::new(BufferAllocator::new(512, source));
592
593 let buf1 = allocator.allocate_buffer(512 * 1024).await;
594 let buf2 = allocator.allocate_buffer(512 * 1024).await;
595 let bufs_dropped = Arc::new(AtomicBool::new(false));
596
597 let allocator_clone = allocator.clone();
599 let bufs_dropped_clone = bufs_dropped.clone();
600 let buf3_fut = async move {
601 allocator_clone.allocate_buffer(1024 * 1024).await;
602 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
603 };
604 pin_mut!(buf3_fut);
605
606 let mut buf_futs = vec![];
608 for _ in 0..16 {
609 let allocator_clone = allocator.clone();
610 let bufs_dropped_clone = bufs_dropped.clone();
611 let fut = async move {
612 allocator_clone.allocate_buffer(64 * 1024).await;
613 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
617 };
618 buf_futs.push(fut);
619 }
620
621 futures::join!(buf3_fut, join_all(buf_futs), async move {
622 std::mem::drop(buf1);
623 std::mem::drop(buf2);
624 bufs_dropped.store(true, Ordering::Relaxed);
625 });
626 }
627}