1use crate::buffer::{round_down, round_up, Buffer};
6use event_listener::{Event, EventListener};
7use futures::{Future, FutureExt as _};
8use std::collections::BTreeMap;
9use std::ops::Range;
10use std::pin::Pin;
11use std::sync::Mutex;
12use std::task::{Context, Poll};
13
14#[cfg(target_os = "fuchsia")]
15mod buffer_source {
16 use fuchsia_runtime::vmar_root_self;
17 use std::ops::Range;
18 use zx::{self as zx, AsHandleRef};
19
20 #[derive(Debug)]
22 pub struct BufferSource {
23 base: *mut u8,
24 size: usize,
25 vmo: zx::Vmo,
26 }
27
28 unsafe impl Send for BufferSource {}
31 unsafe impl Sync for BufferSource {}
32
33 impl BufferSource {
34 pub fn new(size: usize) -> Self {
35 let vmo = zx::Vmo::create(size as u64).unwrap();
36 let name = zx::Name::new("transfer-buf").unwrap();
37 vmo.set_name(&name).unwrap();
38 let flags = zx::VmarFlags::PERM_READ
39 | zx::VmarFlags::PERM_WRITE
40 | zx::VmarFlags::MAP_RANGE
41 | zx::VmarFlags::REQUIRE_NON_RESIZABLE;
42 let base = vmar_root_self().map(0, &vmo, 0, size, flags).unwrap() as *mut u8;
43 Self { base, size, vmo }
44 }
45
46 pub fn size(&self) -> usize {
47 self.size
48 }
49
50 pub fn vmo(&self) -> &zx::Vmo {
51 &self.vmo
52 }
53
54 #[allow(clippy::mut_from_ref)]
55 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
56 assert!(range.start < self.size && range.end <= self.size);
57 std::slice::from_raw_parts_mut(self.base.add(range.start), range.end - range.start)
58 }
59
60 pub fn commit_range(&self, range: Range<usize>) -> Result<(), zx::Status> {
62 self.vmo.op_range(zx::VmoOp::COMMIT, range.start as u64, range.len() as u64)
63 }
64 }
65
66 impl Drop for BufferSource {
67 fn drop(&mut self) {
68 unsafe {
70 let _ = vmar_root_self().unmap(self.base as usize, self.size);
71 }
72 }
73 }
74}
75
76#[cfg(not(target_os = "fuchsia"))]
77mod buffer_source {
78 use std::cell::UnsafeCell;
79 use std::ops::Range;
80 use std::pin::Pin;
81
82 #[derive(Debug)]
84 pub struct BufferSource {
85 data: UnsafeCell<Pin<Vec<u8>>>,
90 }
91
92 unsafe impl Sync for BufferSource {}
95
96 impl BufferSource {
97 pub fn new(size: usize) -> Self {
98 Self { data: UnsafeCell::new(Pin::new(vec![0 as u8; size])) }
99 }
100
101 pub fn size(&self) -> usize {
102 unsafe { (&*self.data.get()).len() }
104 }
105
106 #[allow(clippy::mut_from_ref)]
107 pub(super) unsafe fn sub_slice(&self, range: &Range<usize>) -> &mut [u8] {
108 assert!(range.start < self.size() && range.end <= self.size());
109 &mut (&mut *self.data.get())[range.start..range.end]
110 }
111 }
112}
113
114pub use buffer_source::BufferSource;
115
116type FreeList = Vec<usize>;
120
121#[derive(Debug)]
122struct Inner {
123 free_lists: Vec<FreeList>,
125 allocation_map: BTreeMap<usize, usize>,
127}
128
129#[derive(Debug)]
133pub struct BufferAllocator {
134 block_size: usize,
135 source: BufferSource,
136 inner: Mutex<Inner>,
137 event: Event,
138}
139
140fn order(size: usize, block_size: usize) -> usize {
142 if size <= block_size {
143 return 0;
144 }
145 let nblocks = round_up(size, block_size) / block_size;
146 nblocks.next_power_of_two().trailing_zeros() as usize
147}
148
149fn order_fit(size: usize, block_size: usize) -> usize {
151 assert!(size >= block_size);
152 let nblocks = round_up(size, block_size) / block_size;
153 if nblocks.is_power_of_two() {
154 nblocks.trailing_zeros() as usize
155 } else {
156 nblocks.next_power_of_two().trailing_zeros() as usize - 1
157 }
158}
159
160fn size_for_order(order: usize, block_size: usize) -> usize {
161 block_size * (1 << (order as u32))
162}
163
164fn initial_free_lists(size: usize, block_size: usize) -> Vec<FreeList> {
165 let size = round_down(size, block_size);
166 assert!(block_size <= size);
167 assert!(block_size.is_power_of_two());
168 let max_order = order_fit(size, block_size);
169 let mut free_lists = Vec::new();
170 for _ in 0..max_order + 1 {
171 free_lists.push(FreeList::new())
172 }
173 let mut offset = 0;
174 while offset < size {
175 let order = order_fit(size - offset, block_size);
176 let size = size_for_order(order, block_size);
177 free_lists[order].push(offset);
178 offset += size;
179 }
180 free_lists
181}
182
183pub struct BufferFuture<'a> {
185 allocator: &'a BufferAllocator,
186 size: usize,
187 listener: Option<EventListener>,
188}
189
190impl<'a> Future for BufferFuture<'a> {
191 type Output = Buffer<'a>;
192
193 fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194 if let Some(listener) = self.listener.as_mut() {
195 futures::ready!(listener.poll_unpin(context));
196 }
197 loop {
200 match self.allocator.try_allocate_buffer(self.size) {
201 Ok(buffer) => return Poll::Ready(buffer),
202 Err(mut listener) => {
203 if listener.poll_unpin(context).is_pending() {
204 self.listener = Some(listener);
205 return Poll::Pending;
206 }
207 }
208 }
209 }
210 }
211}
212
213impl BufferAllocator {
214 pub fn new(block_size: usize, source: BufferSource) -> Self {
215 let free_lists = initial_free_lists(source.size(), block_size);
216 Self {
217 block_size,
218 source,
219 inner: Mutex::new(Inner { free_lists, allocation_map: BTreeMap::new() }),
220 event: Event::new(),
221 }
222 }
223
224 pub fn block_size(&self) -> usize {
225 self.block_size
226 }
227
228 pub fn buffer_source(&self) -> &BufferSource {
229 &self.source
230 }
231
232 pub fn take_buffer_source(self) -> BufferSource {
234 self.source
235 }
236
237 pub fn allocate_buffer(&self, size: usize) -> BufferFuture<'_> {
245 BufferFuture { allocator: self, size, listener: None }
246 }
247
248 pub fn try_allocate_buffer(&self, size: usize) -> Result<Buffer<'_>, EventListener> {
251 if size > self.source.size() {
252 panic!("Allocation of {} bytes would exceed limit {}", size, self.source.size());
253 }
254 let mut inner = self.inner.lock().unwrap();
255 let requested_order = order(size, self.block_size());
256 assert!(requested_order < inner.free_lists.len());
257 let mut order = {
259 let mut idx = requested_order;
260 loop {
261 if idx >= inner.free_lists.len() {
262 return Err(self.event.listen());
263 }
264 if !inner.free_lists[idx].is_empty() {
265 break idx;
266 }
267 idx += 1;
268 }
269 };
270
271 let offset = inner.free_lists[order].pop().unwrap();
273 while order > requested_order {
274 order -= 1;
275 assert!(inner.free_lists[order].is_empty());
276 inner.free_lists[order].push(offset + self.size_for_order(order));
277 }
278
279 inner.allocation_map.insert(offset, self.size_for_order(order));
280 let range = offset..offset + size;
281 log::debug!(range:?, bytes_used = self.size_for_order(order); "Allocated");
282
283 Ok(Buffer::new(unsafe { self.source.sub_slice(&range) }, range, &self))
285 }
286
287 #[doc(hidden)]
289 pub(super) fn free_buffer(&self, range: Range<usize>) {
290 let mut inner = self.inner.lock().unwrap();
291 let mut offset = range.start;
292 let size = inner
293 .allocation_map
294 .remove(&offset)
295 .unwrap_or_else(|| panic!("No allocation record found for {:?}", range));
296 assert!(range.end - range.start <= size);
297 log::debug!(range:?, bytes_used = size; "Freeing");
298
299 let mut order = order(size, self.block_size());
301 while order < inner.free_lists.len() - 1 {
302 let buddy = self.find_buddy(offset, order);
303 let idx = if let Ok(idx) = inner.free_lists[order].binary_search(&buddy) {
304 idx
305 } else {
306 break;
307 };
308 inner.free_lists[order].remove(idx);
309 offset = std::cmp::min(offset, buddy);
310 order += 1;
311 }
312
313 let idx = inner.free_lists[order]
314 .binary_search(&offset)
315 .expect_err(&format!("Unexpectedly found {} in free list {}", offset, order));
316 inner.free_lists[order].insert(idx, offset);
317
318 self.event.notify(usize::MAX);
320 }
321
322 fn size_for_order(&self, order: usize) -> usize {
323 size_for_order(order, self.block_size)
324 }
325
326 fn find_buddy(&self, offset: usize, order: usize) -> usize {
327 offset ^ self.size_for_order(order)
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use crate::buffer_allocator::{order, BufferAllocator, BufferSource};
334 use fuchsia_async as fasync;
335 use futures::future::join_all;
336 use futures::pin_mut;
337 use rand::prelude::SliceRandom;
338 use rand::{thread_rng, Rng};
339 use std::sync::atomic::{AtomicBool, Ordering};
340 use std::sync::Arc;
341
342 #[fuchsia::test]
343 async fn test_odd_sized_buffer_source() {
344 let source = BufferSource::new(123);
345 let allocator = BufferAllocator::new(2, source);
346
347 let sizes = vec![64, 32, 16, 8, 2];
349 let mut bufs = vec![];
350 for size in sizes.iter() {
351 bufs.push(allocator.allocate_buffer(*size).await);
352 }
353 for (expected_size, buf) in sizes.iter().zip(bufs.iter()) {
354 assert_eq!(*expected_size, buf.len());
355 }
356 assert!(allocator.try_allocate_buffer(2).is_err());
357 }
358
359 #[fuchsia::test]
360 async fn test_allocate_buffer_read_write() {
361 let source = BufferSource::new(1024 * 1024);
362 let allocator = BufferAllocator::new(8192, source);
363
364 let mut buf = allocator.allocate_buffer(8192).await;
365 buf.as_mut_slice().fill(0xaa as u8);
366 let mut vec = vec![0 as u8; 8192];
367 vec.copy_from_slice(buf.as_slice());
368 assert_eq!(vec, vec![0xaa as u8; 8192]);
369 }
370
371 #[fuchsia::test]
372 async fn test_allocate_buffer_consecutive_calls_do_not_overlap() {
373 let source = BufferSource::new(1024 * 1024);
374 let allocator = BufferAllocator::new(8192, source);
375
376 let buf1 = allocator.allocate_buffer(8192).await;
377 let buf2 = allocator.allocate_buffer(8192).await;
378 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
379 }
380
381 #[fuchsia::test]
382 async fn test_allocate_many_buffers() {
383 let source = BufferSource::new(1024 * 1024);
384 let allocator = BufferAllocator::new(8192, source);
385
386 for _ in 0..10 {
387 let _ = allocator.allocate_buffer(8192).await;
388 }
389 }
390
391 #[fuchsia::test]
392 async fn test_allocate_small_buffers_dont_overlap() {
393 let source = BufferSource::new(1024 * 1024);
394 let allocator = BufferAllocator::new(8192, source);
395
396 let buf1 = allocator.allocate_buffer(1).await;
397 let buf2 = allocator.allocate_buffer(1).await;
398 assert!(buf1.range().end <= buf2.range().start || buf2.range().end <= buf1.range().start);
399 }
400
401 #[fuchsia::test]
402 async fn test_allocate_large_buffer() {
403 let source = BufferSource::new(1024 * 1024);
404 let allocator = BufferAllocator::new(8192, source);
405
406 let mut buf = allocator.allocate_buffer(1024 * 1024).await;
407 assert_eq!(buf.len(), 1024 * 1024);
408 buf.as_mut_slice().fill(0xaa as u8);
409 let mut vec = vec![0 as u8; 1024 * 1024];
410 vec.copy_from_slice(buf.as_slice());
411 assert_eq!(vec, vec![0xaa as u8; 1024 * 1024]);
412 }
413
414 #[fuchsia::test]
415 async fn test_allocate_large_buffer_after_smaller_buffers() {
416 let source = BufferSource::new(1024 * 1024);
417 let allocator = BufferAllocator::new(8192, source);
418
419 {
420 let mut buffers = vec![];
421 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
422 buffers.push(buffer);
423 }
424 }
425 let buf = allocator.allocate_buffer(1024 * 1024).await;
426 assert_eq!(buf.len(), 1024 * 1024);
427 }
428
429 #[fuchsia::test]
430 async fn test_allocate_at_limits() {
431 let source = BufferSource::new(1024 * 1024);
432 let allocator = BufferAllocator::new(8192, source);
433
434 let mut buffers = vec![];
435 while let Ok(buffer) = allocator.try_allocate_buffer(8192) {
436 buffers.push(buffer);
437 }
438 buffers.pop();
440 let buf = allocator.allocate_buffer(8192).await;
441 assert_eq!(buf.len(), 8192);
442 }
443
444 #[fuchsia::test(threads = 10)]
445 async fn test_random_allocs_deallocs() {
446 let source = BufferSource::new(16 * 1024 * 1024);
447 let bs = 512;
448 let allocator = Arc::new(BufferAllocator::new(bs, source));
449
450 join_all((0..10).map(|_| {
451 let allocator = allocator.clone();
452 fasync::Task::spawn(async move {
453 let mut rng = thread_rng();
454 enum Op {
455 Alloc,
456 Dealloc,
457 }
458 let ops = vec![Op::Alloc, Op::Dealloc];
459 let mut buffers = vec![];
460 for _ in 0..1000 {
461 match ops.choose(&mut rng).unwrap() {
462 Op::Alloc => {
463 let order: usize = rng.gen_range(order(1, bs)..order(65536 + 1, bs));
468 let size: usize = rng.gen_range(
469 bs * 2_usize.pow(order as u32)..bs * 2_usize.pow(order as u32 + 1),
470 );
471 if let Ok(mut buf) = allocator.try_allocate_buffer(size) {
472 let val = rng.gen::<u8>();
473 buf.as_mut_slice().fill(val);
474 for v in buf.as_slice() {
475 assert_eq!(v, &val);
476 }
477 buffers.push(buf);
478 }
479 }
480 Op::Dealloc if !buffers.is_empty() => {
481 let idx = rng.gen_range(0..buffers.len());
482 buffers.remove(idx);
483 }
484 _ => {}
485 };
486 }
487 })
488 }))
489 .await;
490 }
491
492 #[fuchsia::test]
493 async fn test_buffer_refs() {
494 let source = BufferSource::new(1024 * 1024);
495 let allocator = BufferAllocator::new(512, source);
496
497 let _buf = allocator.allocate_buffer(512).await;
500 let mut buf = allocator.allocate_buffer(4096).await;
501 let base = buf.range().start;
502 {
503 let mut bref = buf.subslice_mut(1000..2000);
504 assert_eq!(bref.len(), 1000);
505 assert_eq!(bref.range(), base + 1000..base + 2000);
506 bref.as_mut_slice().fill(0xbb);
507 {
508 let mut bref2 = bref.reborrow().subslice_mut(0..100);
509 assert_eq!(bref2.len(), 100);
510 assert_eq!(bref2.range(), base + 1000..base + 1100);
511 bref2.as_mut_slice().fill(0xaa);
512 }
513 {
514 let mut bref2 = bref.reborrow().subslice_mut(900..1000);
515 assert_eq!(bref2.len(), 100);
516 assert_eq!(bref2.range(), base + 1900..base + 2000);
517 bref2.as_mut_slice().fill(0xcc);
518 }
519 assert_eq!(bref.as_slice()[..100], vec![0xaa; 100]);
520 assert_eq!(bref.as_slice()[100..900], vec![0xbb; 800]);
521
522 let bref = bref.subslice_mut(900..);
523 assert_eq!(bref.len(), 100);
524 assert_eq!(bref.as_slice(), vec![0xcc; 100]);
525 }
526 {
527 let bref = buf.as_ref();
528 assert_eq!(bref.len(), 4096);
529 assert_eq!(bref.range(), base..base + 4096);
530 assert_eq!(bref.as_slice()[0..1000], vec![0x00; 1000]);
531 {
532 let bref2 = bref.subslice(1000..2000);
533 assert_eq!(bref2.len(), 1000);
534 assert_eq!(bref2.range(), base + 1000..base + 2000);
535 assert_eq!(bref2.as_slice()[..100], vec![0xaa; 100]);
536 assert_eq!(bref2.as_slice()[100..900], vec![0xbb; 800]);
537 assert_eq!(bref2.as_slice()[900..1000], vec![0xcc; 100]);
538 }
539
540 let bref = bref.subslice(2048..);
541 assert_eq!(bref.len(), 2048);
542 assert_eq!(bref.as_slice(), vec![0x00; 2048]);
543 }
544 }
545
546 #[fuchsia::test]
547 async fn test_buffer_split() {
548 let source = BufferSource::new(1024 * 1024);
549 let allocator = BufferAllocator::new(512, source);
550
551 let _buf = allocator.allocate_buffer(512).await;
554 let mut buf = allocator.allocate_buffer(4096).await;
555 let base = buf.range().start;
556 {
557 let bref = buf.as_mut();
558 let (mut s1, mut s2) = bref.split_at_mut(2048);
559 assert_eq!(s1.len(), 2048);
560 assert_eq!(s1.range(), base..base + 2048);
561 s1.as_mut_slice().fill(0xaa);
562 assert_eq!(s2.len(), 2048);
563 assert_eq!(s2.range(), base + 2048..base + 4096);
564 s2.as_mut_slice().fill(0xbb);
565 }
566 {
567 let bref = buf.as_ref();
568 let (s1, s2) = bref.split_at(1);
569 let (s2, s3) = s2.split_at(2047);
570 let (s3, s4) = s3.split_at(0);
571 assert_eq!(s1.len(), 1);
572 assert_eq!(s1.range(), base..base + 1);
573 assert_eq!(s2.len(), 2047);
574 assert_eq!(s2.range(), base + 1..base + 2048);
575 assert_eq!(s3.len(), 0);
576 assert_eq!(s3.range(), base + 2048..base + 2048);
577 assert_eq!(s4.len(), 2048);
578 assert_eq!(s4.range(), base + 2048..base + 4096);
579 assert_eq!(s1.as_slice(), vec![0xaa; 1]);
580 assert_eq!(s2.as_slice(), vec![0xaa; 2047]);
581 assert_eq!(s3.as_slice(), vec![]);
582 assert_eq!(s4.as_slice(), vec![0xbb; 2048]);
583 }
584 }
585
586 #[fuchsia::test]
587 async fn test_blocking_allocation() {
588 let source = BufferSource::new(1024 * 1024);
589 let allocator = Arc::new(BufferAllocator::new(512, source));
590
591 let buf1 = allocator.allocate_buffer(512 * 1024).await;
592 let buf2 = allocator.allocate_buffer(512 * 1024).await;
593 let bufs_dropped = Arc::new(AtomicBool::new(false));
594
595 let allocator_clone = allocator.clone();
597 let bufs_dropped_clone = bufs_dropped.clone();
598 let buf3_fut = async move {
599 allocator_clone.allocate_buffer(1024 * 1024).await;
600 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
601 };
602 pin_mut!(buf3_fut);
603
604 let mut buf_futs = vec![];
606 for _ in 0..16 {
607 let allocator_clone = allocator.clone();
608 let bufs_dropped_clone = bufs_dropped.clone();
609 let fut = async move {
610 allocator_clone.allocate_buffer(64 * 1024).await;
611 assert!(bufs_dropped_clone.load(Ordering::Relaxed), "Allocation finished early");
615 };
616 buf_futs.push(fut);
617 }
618
619 futures::join!(buf3_fut, join_all(buf_futs), async move {
620 std::mem::drop(buf1);
621 std::mem::drop(buf2);
622 bufs_dropped.store(true, Ordering::Relaxed);
623 });
624 }
625}