1use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::{ManuallyDrop, MaybeUninit};
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::atomic::{self, AtomicBool, AtomicU64};
19use std::sync::Arc;
20use std::task::Poll;
21
22use explicit::ResultExt as _;
23use fidl_fuchsia_hardware_network as netdev;
24use fuchsia_runtime::vmar_root_self;
25use futures::channel::oneshot::{channel, Receiver, Sender};
26
27use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
28use crate::error::{Error, Result};
29use crate::session::{BufferLayout, Config, Pending, Port};
30
31pub(in crate::session) struct Pool {
33 base: NonNull<u8>,
36 bytes: usize,
38 descriptors: Descriptors,
40 tx_alloc_state: Mutex<TxAllocState>,
42 pub(in crate::session) rx_pending: Pending<Rx>,
44 buffer_layout: BufferLayout,
46 rx_leases: RxLeaseHandlingState,
48}
49
50unsafe impl Send for Pool {}
55unsafe impl Sync for Pool {}
56
57struct TxAllocState {
59 requests: VecDeque<TxAllocReq>,
61 free_list: TxFreeList,
62}
63
64struct TxFreeList {
72 head: Option<DescId<Tx>>,
75 len: u16,
77}
78
79impl Pool {
80 pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
85 let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
86 config;
87 let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
88 let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
89 Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
90
91 let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
93 descriptors.borrow_mut(&mut curr).set_nxt(head);
94 Some(curr)
95 });
96
97 for rx_desc in rx_free.iter_mut() {
98 descriptors.borrow_mut(rx_desc).initialize(
99 ChainLength::ZERO,
100 0,
101 buffer_layout.length.try_into().unwrap(),
102 0,
103 );
104 }
105
106 let tx_alloc_state = TxAllocState {
107 free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
108 requests: VecDeque::new(),
109 };
110
111 let size = buffer_stride.get() * u64::from(num_buffers);
112 let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
113 let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
118 let base = NonNull::new(
121 vmar_root_self()
122 .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
123 .map_err(|status| Error::Map("data", status))? as *mut u8,
124 )
125 .unwrap();
126
127 Ok((
128 Arc::new(Pool {
129 base,
130 bytes: len,
131 descriptors,
132 tx_alloc_state: Mutex::new(tx_alloc_state),
133 rx_pending: Pending::new(rx_free),
134 buffer_layout,
135 rx_leases: RxLeaseHandlingState::new_with_flags(options),
136 }),
137 descriptors_vmo,
138 data_vmo,
139 ))
140 }
141
142 pub(in crate::session) async fn alloc_tx(
151 self: &Arc<Self>,
152 num_parts: ChainLength,
153 ) -> AllocGuard<Tx> {
154 let receiver = {
155 let mut state = self.tx_alloc_state.lock();
156 match state.free_list.try_alloc(num_parts, &self.descriptors) {
157 Some(allocated) => {
158 return AllocGuard::new(allocated, self.clone());
159 }
160 None => {
161 let (request, receiver) = TxAllocReq::new(num_parts);
162 state.requests.push_back(request);
163 receiver
164 }
165 }
166 };
167 receiver.await.unwrap()
169 }
170
171 pub(in crate::session) async fn alloc_tx_buffer(
178 self: &Arc<Self>,
179 num_bytes: usize,
180 ) -> Result<Buffer<Tx>> {
181 self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
182 }
183
184 pub(in crate::session) async fn alloc_tx_buffers<'a>(
197 self: &'a Arc<Self>,
198 num_bytes: usize,
199 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
200 let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
201 self.buffer_layout;
202 let tx_head = usize::from(min_tx_head);
203 let tx_tail = usize::from(min_tx_tail);
204 let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
205 let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
206 let chain_length = ChainLength::try_from(num_parts)?;
207 let first = self.alloc_tx(chain_length).await;
208 let iter = std::iter::once(first)
209 .chain(std::iter::from_fn(move || {
210 let mut state = self.tx_alloc_state.lock();
211 state
212 .free_list
213 .try_alloc(chain_length, &self.descriptors)
214 .map(|allocated| AllocGuard::new(allocated, self.clone()))
215 }))
216 .fuse()
219 .map(move |mut alloc| {
220 alloc.init(num_bytes)?;
221 Ok(alloc.into())
222 });
223 Ok(iter)
224 }
225
226 pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
228 self.rx_pending.extend(descs.into_iter().map(|mut desc| {
229 self.descriptors.borrow_mut(&mut desc).initialize(
230 ChainLength::ZERO,
231 0,
232 self.buffer_layout.length.try_into().unwrap(),
233 0,
234 );
235 desc
236 }));
237 }
238
239 fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
245 let free_impl = |free_list: &mut TxFreeList, chain: Chained<DescId<Tx>>| {
246 let mut descs = chain.into_iter();
247 free_list.len += u16::try_from(descs.len()).unwrap();
251 let head = descs.next();
252 let old_head = std::mem::replace(&mut free_list.head, head);
253 let mut tail = descs.last();
254 let mut tail_ref = self
255 .descriptors
256 .borrow_mut(tail.as_mut().unwrap_or_else(|| free_list.head.as_mut().unwrap()));
257 tail_ref.set_nxt(old_head);
258 };
259
260 let mut state = self.tx_alloc_state.lock();
261 let TxAllocState { requests, free_list } = &mut *state;
262 let () = free_impl(free_list, chain);
263
264 while let Some(req) = requests.front() {
267 match free_list.try_alloc(req.size, &self.descriptors) {
268 Some(descs) => {
269 match requests
271 .pop_front()
272 .unwrap()
273 .sender
274 .send(AllocGuard::new(descs, self.clone()))
275 .map_err(ManuallyDrop::new)
276 {
277 Ok(()) => {}
278 Err(mut alloc) => {
279 let AllocGuard { descs, pool } = alloc.deref_mut();
280 let () =
285 free_impl(free_list, std::mem::replace(descs, Chained::empty()));
286 let () = unsafe {
289 std::ptr::drop_in_place(pool);
290 };
291 }
292 }
293 }
294 None => {
295 if req.sender.is_canceled() {
296 let _cancelled: Option<TxAllocReq> = requests.pop_front();
297 continue;
298 } else {
299 break;
300 }
301 }
302 }
303 }
304 }
305
306 pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
310 let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
311 Ok(self.free_tx(chain))
312 }
313
314 pub(in crate::session) fn rx_completed(
320 self: &Arc<Self>,
321 head: DescId<Rx>,
322 ) -> Result<Buffer<Rx>> {
323 let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
324 let alloc = AllocGuard::new(descs, self.clone());
325 Ok(alloc.into())
326 }
327}
328
329impl Drop for Pool {
330 fn drop(&mut self) {
331 unsafe {
332 vmar_root_self()
333 .unmap(self.base.as_ptr() as usize, self.bytes)
334 .expect("failed to unmap VMO for Pool")
335 }
336 }
337}
338
339impl TxFreeList {
340 fn try_alloc(
344 &mut self,
345 num_parts: ChainLength,
346 descriptors: &Descriptors,
347 ) -> Option<Chained<DescId<Tx>>> {
348 if u16::from(num_parts.get()) > self.len {
349 return None;
350 }
351
352 let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
353 let new_head = self.head.as_ref().and_then(|head| {
354 let nxt = descriptors.borrow(head).nxt();
355 nxt.map(|id| unsafe {
356 DescId::from_raw(id)
359 })
360 });
361 std::mem::replace(&mut self.head, new_head)
362 });
363 let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
364 assert_eq!(allocated.len(), num_parts.into());
365 self.len -= u16::from(num_parts.get());
366 Some(allocated)
367 }
368}
369
370pub struct Buffer<K: AllocKind> {
375 alloc: AllocGuard<K>,
377 parts: Chained<BufferPart>,
379 pos: usize,
381}
382
383impl<K: AllocKind> Debug for Buffer<K> {
384 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385 let Self { alloc, parts, pos } = self;
386 f.debug_struct("Buffer")
387 .field("cap", &self.cap())
388 .field("alloc", alloc)
389 .field("parts", parts)
390 .field("pos", pos)
391 .finish()
392 }
393}
394
395impl<K: AllocKind> Buffer<K> {
396 pub fn cap(&self) -> usize {
398 self.parts.iter().fold(0, |acc, part| acc + part.cap)
399 }
400
401 pub fn len(&self) -> usize {
403 self.parts.iter().fold(0, |acc, part| acc + part.len)
404 }
405
406 pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
412 if self.cap() < offset + src.len() {
413 return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
414 }
415 let mut part_start = 0;
416 let mut total = 0;
417 for part in self.parts.iter_mut() {
418 if offset + total < part_start + part.cap {
419 let written = part.write_at(offset + total - part_start, &src[total..])?;
420 total += written;
421 if total == src.len() {
422 break;
423 }
424 } else {
425 part.len = part.cap;
426 }
427 part_start += part.cap;
428 }
429 assert_eq!(total, src.len());
430 Ok(())
431 }
432
433 pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
439 if self.len() < offset + dst.len() {
440 return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
441 }
442 let mut part_start = 0;
443 let mut total = 0;
444 for part in self.parts.iter() {
445 if offset + total < part_start + part.cap {
446 let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
447 total += read;
448 if total == dst.len() {
449 break;
450 }
451 }
452 part_start += part.cap;
453 }
454 assert_eq!(total, dst.len());
455 Ok(())
456 }
457
458 pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
460 match &mut (*self.parts)[..] {
461 [] => Some(&mut []),
462 [one] => Some(one.as_slice_mut()),
463 _ => None,
464 }
465 }
466
467 pub(in crate::session) fn pad(&mut self) -> Result<()> {
469 let num_parts = self.parts.len();
470 let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
471 self.alloc.pool.buffer_layout;
472 let mut target = min_tx_data;
473 for (i, part) in self.parts.iter_mut().enumerate() {
474 let grow_cap = if i == num_parts - 1 {
475 let descriptor =
476 self.alloc.descriptors().last().expect("descriptor must not be empty");
477 let data_length = descriptor.data_length();
478 let tail_length = descriptor.tail_length();
479 let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
481 match rest.checked_sub(usize::from(min_tx_tail)) {
482 Some(grow_cap) => Some(grow_cap),
483 None => break,
484 }
485 } else {
486 None
487 };
488 target -= part.pad(target, grow_cap)?;
489 }
490 if target != 0 {
491 return Err(Error::Pad(min_tx_data, self.cap()));
492 }
493 Ok(())
494 }
495
496 pub(in crate::session) fn leak(mut self) -> DescId<K> {
500 let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
501 descs.into_iter().next().unwrap()
502 }
503
504 pub fn frame_type(&self) -> Result<netdev::FrameType> {
506 self.alloc.descriptor().frame_type()
507 }
508
509 pub fn port(&self) -> Port {
511 self.alloc.descriptor().port()
512 }
513}
514
515impl Buffer<Tx> {
516 pub(in crate::session) fn commit(&mut self) {
518 for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
519 descriptor.commit(u32::try_from(part.len).unwrap())
522 }
523 }
524
525 pub fn set_port(&mut self, port: Port) {
527 self.alloc.descriptor_mut().set_port(port)
528 }
529
530 pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
532 self.alloc.descriptor_mut().set_frame_type(frame_type)
533 }
534
535 pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
537 self.alloc.descriptor_mut().set_tx_flags(flags)
538 }
539}
540
541impl Buffer<Rx> {
542 pub async fn into_tx(self) -> Buffer<Tx> {
544 let Buffer { alloc, parts, pos } = self;
545 Buffer { alloc: alloc.into_tx().await, parts, pos }
546 }
547
548 pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
550 self.alloc.descriptor().rx_flags()
551 }
552}
553
554impl AllocGuard<Rx> {
555 async fn into_tx(mut self) -> AllocGuard<Tx> {
561 let mut tx = self.pool.alloc_tx(self.descs.len).await;
562 std::mem::swap(&mut self.descs.storage, unsafe {
568 std::mem::transmute(&mut tx.descs.storage)
569 });
570 tx
571 }
572}
573
574struct Chained<T> {
576 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
577 len: ChainLength,
578}
579
580impl<T> Deref for Chained<T> {
581 type Target = [T];
582
583 fn deref(&self) -> &Self::Target {
584 unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
586 }
587}
588
589impl<T> DerefMut for Chained<T> {
590 fn deref_mut(&mut self) -> &mut Self::Target {
591 unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
593 }
594}
595
596impl<T> Drop for Chained<T> {
597 fn drop(&mut self) {
598 unsafe {
600 std::ptr::drop_in_place(self.deref_mut());
601 }
602 }
603}
604
605impl<T: Debug> Debug for Chained<T> {
606 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607 f.debug_list().entries(self.iter()).finish()
608 }
609}
610
611impl<T> Chained<T> {
612 #[allow(clippy::uninit_assumed_init)]
613 fn empty() -> Self {
614 Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
621 }
622}
623
624impl<T> FromIterator<T> for Chained<T> {
625 fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
630 let mut result = Self::empty();
631 let mut len = 0u8;
632 for (idx, e) in elements.into_iter().enumerate() {
633 result.storage[idx] = MaybeUninit::new(e);
634 len += 1;
635 }
636 assert!(len > 0);
637 result.len = ChainLength::try_from(len).unwrap();
640 result
641 }
642}
643
644impl<T> IntoIterator for Chained<T> {
645 type Item = T;
646 type IntoIter = ChainedIter<T>;
647
648 fn into_iter(mut self) -> Self::IntoIter {
649 let len = self.len;
650 self.len = ChainLength::ZERO;
651 #[allow(clippy::uninit_assumed_init)]
658 let storage =
659 std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
660 ChainedIter { storage, len, consumed: 0 }
661 }
662}
663
664struct ChainedIter<T> {
665 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
666 len: ChainLength,
667 consumed: u8,
668}
669
670impl<T> Iterator for ChainedIter<T> {
671 type Item = T;
672
673 fn next(&mut self) -> Option<Self::Item> {
674 if self.consumed < self.len.get() {
675 let value = unsafe {
678 std::mem::replace(
679 &mut self.storage[usize::from(self.consumed)],
680 MaybeUninit::uninit(),
681 )
682 .assume_init()
683 };
684 self.consumed += 1;
685 Some(value)
686 } else {
687 None
688 }
689 }
690
691 fn size_hint(&self) -> (usize, Option<usize>) {
692 let len = usize::from(self.len.get() - self.consumed);
693 (len, Some(len))
694 }
695}
696
697impl<T> ExactSizeIterator for ChainedIter<T> {}
698
699impl<T> Drop for ChainedIter<T> {
700 fn drop(&mut self) {
701 unsafe {
703 std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
704 &mut self.storage[self.consumed.into()..self.len.into()],
705 ));
706 }
707 }
708}
709
710pub(in crate::session) struct AllocGuard<K: AllocKind> {
712 descs: Chained<DescId<K>>,
713 pool: Arc<Pool>,
714}
715
716impl<K: AllocKind> Debug for AllocGuard<K> {
717 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718 let Self { descs, pool: _ } = self;
719 f.debug_struct("AllocGuard").field("descs", descs).finish()
720 }
721}
722
723impl<K: AllocKind> AllocGuard<K> {
724 fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
725 Self { descs, pool }
726 }
727
728 fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
730 self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
731 }
732
733 fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
735 let descriptors = &self.pool.descriptors;
736 self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
737 }
738
739 fn descriptor(&self) -> DescRef<'_, K> {
741 self.descriptors().next().expect("descriptors must not be empty")
742 }
743
744 fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
746 self.descriptors_mut().next().expect("descriptors must not be empty")
747 }
748}
749
750impl AllocGuard<Tx> {
751 fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
753 let len = self.len();
754 let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
755 self.pool.buffer_layout;
756 for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
757 let chain_length = ChainLength::try_from(clen).unwrap();
758 let head_length = if clen + 1 == len { min_tx_head } else { 0 };
759 let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
760
761 let available_bytes =
764 u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
765 .unwrap();
766
767 let data_length = match u32::try_from(requested_bytes) {
768 Ok(requested) => {
769 if requested < available_bytes {
770 tail_length = u16::try_from(available_bytes - requested)
774 .ok_checked::<TryFromIntError>()
775 .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
776 .ok_or(Error::TxLength)?;
777 }
778 requested.min(available_bytes)
779 }
780 Err(TryFromIntError { .. }) => available_bytes,
781 };
782
783 requested_bytes -=
784 usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
785 panic!(
786 "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
787 data_length, requested_bytes
788 )
789 });
790 descriptor.initialize(chain_length, head_length, data_length, tail_length);
791 }
792 assert_eq!(requested_bytes, 0);
793 Ok(())
794 }
795}
796
797impl<K: AllocKind> Drop for AllocGuard<K> {
798 fn drop(&mut self) {
799 if self.is_empty() {
800 return;
801 }
802 K::free(private::Allocation(self));
803 }
804}
805
806impl<K: AllocKind> Deref for AllocGuard<K> {
807 type Target = [DescId<K>];
808
809 fn deref(&self) -> &Self::Target {
810 self.descs.deref()
811 }
812}
813
814struct BufferPart {
818 ptr: *mut u8,
820 cap: usize,
822 len: usize,
826}
827
828impl BufferPart {
829 unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
838 Self { ptr, cap, len }
839 }
840
841 fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
847 let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
848 let to_copy = std::cmp::min(available, dst.len());
849 unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
852 Ok(to_copy)
853 }
854
855 fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
861 let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
862 let to_copy = std::cmp::min(src.len(), available);
863 unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
866 self.len = std::cmp::max(self.len, offset + to_copy);
867 Ok(to_copy)
868 }
869
870 fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
877 if target <= self.len {
878 return Ok(target);
879 }
880 if let Some(limit) = limit {
881 if target > limit {
882 return Err(Error::Pad(target, self.cap));
883 }
884 if self.cap < target {
885 self.cap = target
886 }
887 }
888 let new_len = std::cmp::min(target, self.cap);
889 unsafe {
892 std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
893 }
894 self.len = new_len;
895 Ok(new_len)
896 }
897
898 fn as_slice_mut(&mut self) -> &mut [u8] {
900 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
904 }
905}
906
907unsafe impl Send for BufferPart {}
911
912impl Debug for BufferPart {
913 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914 let BufferPart { len, cap, ptr } = &self;
915 f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
916 }
917}
918
919impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
920 fn from(alloc: AllocGuard<K>) -> Self {
921 let AllocGuard { pool, descs: _ } = &alloc;
922 let parts: Chained<BufferPart> = alloc
923 .descriptors()
924 .map(|descriptor| {
925 let offset = usize::try_from(descriptor.offset()).unwrap();
928 let head_length = usize::from(descriptor.head_length());
929 let data_length = usize::try_from(descriptor.data_length()).unwrap();
930 let len = match K::REFL {
931 AllocKindRefl::Tx => 0,
932 AllocKindRefl::Rx => data_length,
933 };
934 assert!(
936 offset + head_length <= pool.bytes,
937 "buffer part starts beyond the end of pool"
938 );
939 assert!(
940 offset + head_length + data_length <= pool.bytes,
941 "buffer part ends beyond the end of pool"
942 );
943 unsafe {
949 BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
950 }
951 })
952 .collect();
953 Self { alloc, parts, pos: 0 }
954 }
955}
956
957impl<K: AllocKind> Read for Buffer<K> {
958 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
959 self.read_at(self.pos, buf)
960 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
961 self.pos += buf.len();
962 Ok(buf.len())
963 }
964}
965
966impl Write for Buffer<Tx> {
967 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
968 self.write_at(self.pos, buf)
969 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
970 self.pos += buf.len();
971 Ok(buf.len())
972 }
973
974 fn flush(&mut self) -> std::io::Result<()> {
975 Ok(())
976 }
977}
978
979impl<K: AllocKind> Seek for Buffer<K> {
980 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
981 let pos = match pos {
982 SeekFrom::Start(pos) => pos,
983 SeekFrom::End(offset) => {
984 let end = i64::try_from(self.cap())
985 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
986 u64::try_from(end.wrapping_add(offset)).unwrap()
987 }
988 SeekFrom::Current(offset) => {
989 let current = i64::try_from(self.pos)
990 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
991 u64::try_from(current.wrapping_add(offset)).unwrap()
992 }
993 };
994 self.pos =
995 usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
996 Ok(pos)
997 }
998}
999
1000struct TxAllocReq {
1002 sender: Sender<AllocGuard<Tx>>,
1003 size: ChainLength,
1004}
1005
1006impl TxAllocReq {
1007 fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1008 let (sender, receiver) = channel();
1009 (TxAllocReq { sender, size }, receiver)
1010 }
1011}
1012
1013mod private {
1016 use super::{AllocKind, Rx, Tx};
1017 pub trait Sealed: 'static + Sized {}
1018 impl Sealed for Rx {}
1019 impl Sealed for Tx {}
1020
1021 pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1025}
1026
1027pub trait AllocKind: private::Sealed {
1030 const REFL: AllocKindRefl;
1032
1033 fn free(alloc: private::Allocation<'_, Self>);
1035}
1036
1037pub enum Tx {}
1039pub enum Rx {}
1041
1042pub enum AllocKindRefl {
1044 Tx,
1045 Rx,
1046}
1047
1048impl AllocKindRefl {
1049 pub(in crate::session) fn as_str(&self) -> &'static str {
1050 match self {
1051 AllocKindRefl::Tx => "Tx",
1052 AllocKindRefl::Rx => "Rx",
1053 }
1054 }
1055}
1056
1057impl AllocKind for Tx {
1058 const REFL: AllocKindRefl = AllocKindRefl::Tx;
1059
1060 fn free(alloc: private::Allocation<'_, Self>) {
1061 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1062 pool.free_tx(std::mem::replace(descs, Chained::empty()));
1063 }
1064}
1065
1066impl AllocKind for Rx {
1067 const REFL: AllocKindRefl = AllocKindRefl::Rx;
1068
1069 fn free(alloc: private::Allocation<'_, Self>) {
1070 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1071 pool.free_rx(std::mem::replace(descs, Chained::empty()));
1072 pool.rx_leases.rx_complete();
1073 }
1074}
1075
1076pub(in crate::session) struct RxLeaseHandlingState {
1078 can_watch_rx_leases: AtomicBool,
1079 rx_frame_counter: AtomicU64,
1089 rx_lease_waker: AtomicWaker,
1090}
1091
1092impl RxLeaseHandlingState {
1093 fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1094 Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1095 }
1096
1097 fn new_with_enabled(enabled: bool) -> Self {
1098 Self {
1099 can_watch_rx_leases: AtomicBool::new(enabled),
1100 rx_frame_counter: AtomicU64::new(0),
1101 rx_lease_waker: AtomicWaker::new(),
1102 }
1103 }
1104
1105 fn rx_complete(&self) {
1108 let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1109 let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1110
1111 if prev == u64::MAX {
1114 rx_lease_waker.wake();
1115 }
1116 }
1117}
1118
1119pub(in crate::session) trait RxLeaseHandlingStateContainer {
1122 fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1123}
1124
1125impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1126 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1127 self.borrow()
1128 }
1129}
1130
1131impl RxLeaseHandlingStateContainer for Arc<Pool> {
1132 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1133 &self.rx_leases
1134 }
1135}
1136
1137pub(in crate::session) struct RxLeaseWatcher<T> {
1139 state: T,
1140}
1141
1142impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1143 pub(in crate::session) fn new(state: T) -> Self {
1150 assert!(
1151 state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1152 "can't watch rx leases"
1153 );
1154 Self { state }
1155 }
1156
1157 pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1166 let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1175 self.state.lease_handling_state();
1176
1177 let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1178 let _guard = scopeguard::guard((), |()| {
1181 let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1182 });
1183
1184 if prev >= hold_until_frame {
1186 return;
1187 }
1188 let threshold = prev.wrapping_sub(hold_until_frame);
1191 futures::future::poll_fn(|cx| {
1192 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1193 if v < threshold {
1194 return Poll::Ready(());
1195 }
1196 rx_lease_waker.register(cx.waker());
1197 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1198 if v < threshold {
1199 return Poll::Ready(());
1200 }
1201 Poll::Pending
1202 })
1203 .await;
1204 }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::*;
1210
1211 use assert_matches::assert_matches;
1212 use fuchsia_async as fasync;
1213 use futures::future::FutureExt;
1214 use test_case::test_case;
1215
1216 use std::collections::HashSet;
1217 use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1218 use std::pin::pin;
1219 use std::task::{Poll, Waker};
1220
1221 const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1222 const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1223 const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1225 const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1226 const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1227 const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1228 * netdev::MAX_DESCRIPTOR_CHAIN as usize
1229 - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1230 - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1231
1232 const SENTINEL_BYTE: u8 = 0xab;
1233 const WRITE_BYTE: u8 = 1;
1234 const PAD_BYTE: u8 = 0;
1235
1236 const DEFAULT_CONFIG: Config = Config {
1237 buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1238 num_rx_buffers: DEFAULT_RX_BUFFERS,
1239 num_tx_buffers: DEFAULT_TX_BUFFERS,
1240 options: netdev::SessionFlags::empty(),
1241 buffer_layout: BufferLayout {
1242 length: DEFAULT_BUFFER_LENGTH.get(),
1243 min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1244 min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1245 min_tx_data: 0,
1246 },
1247 };
1248
1249 impl Pool {
1250 fn new_test_default() -> Arc<Self> {
1251 let (pool, _descriptors, _data) =
1252 Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1253 pool
1254 }
1255
1256 async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1257 self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1258 .await
1259 }
1260
1261 fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1262 self.alloc_tx_checked(n).now_or_never()
1263 }
1264
1265 fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1266 std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1267 }
1268
1269 fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1270 self.alloc_tx_buffer(num_bytes)
1271 .now_or_never()
1272 .transpose()
1273 .expect("invalid arguments for alloc_tx_buffer")
1274 }
1275
1276 fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1277 Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1278 }
1279
1280 fn fill_sentinel_bytes(&mut self) {
1281 unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1284 }
1285 }
1286
1287 impl Buffer<Tx> {
1288 fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1292 self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1293 self.pad().expect("failed to pad");
1294 assert_eq!(self.len(), pad_size);
1295 const INIT_BYTE: u8 = 42;
1298 let mut read_buf = vec![INIT_BYTE; pad_size];
1299 self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1300 for (idx, byte) in read_buf.iter().enumerate() {
1301 if idx < offset {
1302 assert_eq!(*byte, SENTINEL_BYTE);
1303 } else if idx == offset {
1304 assert_eq!(*byte, WRITE_BYTE);
1305 } else {
1306 assert_eq!(*byte, PAD_BYTE);
1307 }
1308 }
1309 }
1310 }
1311
1312 impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1313 where
1314 K: AllocKind,
1315 I: ExactSizeIterator<Item = u16>,
1316 T: Copy + IntoIterator<IntoIter = I>,
1317 {
1318 fn eq(&self, other: &T) -> bool {
1319 let iter = other.into_iter();
1320 if usize::from(self.len) != iter.len() {
1321 return false;
1322 }
1323 self.iter().zip(iter).all(|(l, r)| l.get() == r)
1324 }
1325 }
1326
1327 impl Debug for TxAllocReq {
1328 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1329 let TxAllocReq { sender: _, size } = self;
1330 f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1331 }
1332 }
1333
1334 #[test]
1335 fn alloc_tx_distinct() {
1336 let pool = Pool::new_test_default();
1337 let allocated = pool.alloc_tx_all(1);
1338 assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1339 let distinct = allocated
1340 .iter()
1341 .map(|alloc| {
1342 assert_eq!(alloc.descs.len(), 1);
1343 alloc.descs[0].get()
1344 })
1345 .collect::<HashSet<u16>>();
1346 assert_eq!(allocated.len(), distinct.len());
1347 }
1348
1349 #[test]
1350 fn alloc_tx_free_len() {
1351 let pool = Pool::new_test_default();
1352 {
1353 let allocated = pool.alloc_tx_all(2);
1354 assert_eq!(
1355 allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1356 DEFAULT_TX_BUFFERS.get().into()
1357 );
1358 assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1359 }
1360 assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1361 }
1362
1363 #[test]
1364 fn alloc_tx_chain() {
1365 let pool = Pool::new_test_default();
1366 let allocated = pool.alloc_tx_all(3);
1367 assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1368 assert_matches!(pool.alloc_tx_now_or_never(3), None);
1369 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1370 }
1371
1372 #[test]
1373 fn alloc_tx_many() {
1374 let pool = Pool::new_test_default();
1375 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1376 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1377 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1378 let data_len = usize::try_from(data_len).unwrap();
1379 let mut buffers = pool
1380 .alloc_tx_buffers(data_len)
1381 .now_or_never()
1382 .expect("failed to alloc")
1383 .unwrap()
1384 .collect::<Result<Vec<_>>>()
1387 .expect("buffer error");
1388 assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1389
1390 assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1393
1394 assert_matches!(buffers.pop(), Some(_));
1396 let mut more_buffers =
1397 pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1398 let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1399 assert_matches!(more_buffers.next(), None);
1400 drop(buffer);
1403 assert_matches!(more_buffers.next(), None);
1404 }
1405
1406 #[test]
1407 fn alloc_tx_after_free() {
1408 let pool = Pool::new_test_default();
1409 let mut allocated = pool.alloc_tx_all(1);
1410 assert_matches!(pool.alloc_tx_now_or_never(2), None);
1411 {
1412 let _drained = allocated.drain(..2);
1413 }
1414 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1415 }
1416
1417 #[test]
1418 fn blocking_alloc_tx() {
1419 let mut executor = fasync::TestExecutor::new();
1420 let pool = Pool::new_test_default();
1421 let mut allocated = pool.alloc_tx_all(1);
1422 let alloc_fut = pool.alloc_tx_checked(1);
1423 let mut alloc_fut = pin!(alloc_fut);
1424 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1426 assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1428 let freed = allocated
1429 .pop()
1430 .expect("no fulfulled allocations")
1431 .iter()
1432 .map(|x| x.get())
1433 .collect::<Chained<_>>();
1434 let same_as_freed =
1435 |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1436 assert_matches!(
1438 &executor.run_until_stalled(&mut alloc_fut),
1439 Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1440 );
1441 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1443 }
1444
1445 #[test]
1446 fn blocking_alloc_tx_cancel_before_free() {
1447 let mut executor = fasync::TestExecutor::new();
1448 let pool = Pool::new_test_default();
1449 let mut allocated = pool.alloc_tx_all(1);
1450 {
1451 let alloc_fut = pool.alloc_tx_checked(1);
1452 let mut alloc_fut = pin!(alloc_fut);
1453 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1454 assert_matches!(
1455 pool.tx_alloc_state.lock().requests.as_slices(),
1456 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1457 );
1458 }
1459 assert_matches!(
1460 allocated.pop(),
1461 Some(AllocGuard { ref descs, pool: ref p })
1462 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1463 );
1464 let state = pool.tx_alloc_state.lock();
1465 assert_eq!(state.free_list.len, 1);
1466 assert!(state.requests.is_empty());
1467 }
1468
1469 #[test]
1470 fn blocking_alloc_tx_cancel_after_free() {
1471 let mut executor = fasync::TestExecutor::new();
1472 let pool = Pool::new_test_default();
1473 let mut allocated = pool.alloc_tx_all(1);
1474 {
1475 let alloc_fut = pool.alloc_tx_checked(1);
1476 let mut alloc_fut = pin!(alloc_fut);
1477 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1478 assert_matches!(
1479 pool.tx_alloc_state.lock().requests.as_slices(),
1480 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1481 );
1482 assert_matches!(
1483 allocated.pop(),
1484 Some(AllocGuard { ref descs, pool: ref p })
1485 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1486 );
1487 }
1488 let state = pool.tx_alloc_state.lock();
1489 assert_eq!(state.free_list.len, 1);
1490 assert!(state.requests.is_empty());
1491 }
1492
1493 #[test]
1494 fn multiple_blocking_alloc_tx_fulfill_order() {
1495 const TASKS_TOTAL: usize = 3;
1496 let mut executor = fasync::TestExecutor::new();
1497 let pool = Pool::new_test_default();
1498 let mut allocated = pool.alloc_tx_all(1);
1499 let mut alloc_futs = (1..=TASKS_TOTAL)
1500 .rev()
1501 .map(|x| {
1502 let pool = pool.clone();
1503 (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1504 })
1505 .collect::<Vec<_>>();
1506
1507 for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1508 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1509 assert_eq!(idx + *req_size, TASKS_TOTAL);
1511 }
1512 {
1513 let state = pool.tx_alloc_state.lock();
1514 assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1516 let mut requests = state.requests.iter();
1517 assert!(requests.next().unwrap().sender.is_canceled());
1520 assert!(requests.all(|req| !req.sender.is_canceled()))
1522 }
1523
1524 let mut to_free = Vec::new();
1525 let mut freed = 0;
1526 for free_size in (1..=TASKS_TOTAL).rev() {
1527 let (_req_size, mut task) = alloc_futs.remove(0);
1528 for _ in 1..free_size {
1529 freed += 1;
1530 assert_matches!(
1531 allocated.pop(),
1532 Some(AllocGuard { ref descs, pool: ref p })
1533 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1534 );
1535 assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1536 }
1537 freed += 1;
1538 assert_matches!(
1539 allocated.pop(),
1540 Some(AllocGuard { ref descs, pool: ref p })
1541 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1542 );
1543 match executor.run_until_stalled(&mut task) {
1544 Poll::Ready(alloc) => {
1545 assert_eq!(alloc.len(), free_size);
1546 to_free.push(alloc);
1548 }
1549 Poll::Pending => panic!("The request should be fulfilled"),
1550 }
1551 for (_req_size, task) in alloc_futs.iter_mut() {
1553 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1554 }
1555 }
1556 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1557 }
1558
1559 #[test]
1560 fn singleton_tx_layout() {
1561 let pool = Pool::new_test_default();
1562 let buffers = std::iter::from_fn(|| {
1563 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1564 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1565 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1566 pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1567 assert_eq!(buffer.alloc.descriptors().count(), 1);
1568 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1569 * u64::from(buffer.alloc[0].get());
1570 {
1571 let descriptor = buffer.alloc.descriptor();
1572 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1573 assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1574 assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1575 assert_eq!(descriptor.data_length(), data_len);
1576 assert_eq!(descriptor.offset(), offset);
1577 }
1578
1579 assert_eq!(buffer.parts.len(), 1);
1580 let BufferPart { ptr, len, cap } = buffer.parts[0];
1581 assert_eq!(len, 0);
1582 assert_eq!(
1583 pool.base.as_ptr().wrapping_add(
1586 usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1587 ),
1588 ptr
1589 );
1590 assert_eq!(data_len, u32::try_from(cap).unwrap());
1591 buffer
1592 })
1593 })
1594 .collect::<Vec<_>>();
1595 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1596 }
1597
1598 #[test]
1599 fn chained_tx_layout() {
1600 let pool = Pool::new_test_default();
1601 let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1602 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1603 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1604 let buffers = std::iter::from_fn(|| {
1605 pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1606 assert_eq!(buffer.parts.len(), 4);
1607 for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1608 let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1609 let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1610 let tail_length = if chain_length == ChainLength::ZERO {
1611 DEFAULT_MIN_TX_BUFFER_TAIL
1612 } else {
1613 0
1614 };
1615 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1616 - u32::from(head_length)
1617 - u32::from(tail_length);
1618 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1619 * u64::from(buffer.alloc[idx].get());
1620 assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1621 assert_eq!(descriptor.head_length(), head_length);
1622 assert_eq!(descriptor.tail_length(), tail_length);
1623 assert_eq!(descriptor.offset(), offset);
1624 assert_eq!(descriptor.data_length(), data_len);
1625 if chain_length != ChainLength::ZERO {
1626 assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1627 }
1628
1629 let BufferPart { ptr, cap, len } = buffer.parts[idx];
1630 assert_eq!(len, 0);
1631 assert_eq!(
1632 pool.base.as_ptr().wrapping_add(
1635 usize::try_from(offset).unwrap() + usize::from(head_length),
1636 ),
1637 ptr
1638 );
1639 assert_eq!(data_len, u32::try_from(cap).unwrap());
1640 }
1641 buffer
1642 })
1643 })
1644 .collect::<Vec<_>>();
1645 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1646 }
1647
1648 #[test]
1649 fn rx_distinct() {
1650 let pool = Pool::new_test_default();
1651 let mut guard = pool.rx_pending.inner.lock();
1652 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1653 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1654 let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1655 assert_eq!(descs.len(), distinct.len());
1656 }
1657
1658 #[test]
1659 fn alloc_rx_layout() {
1660 let pool = Pool::new_test_default();
1661 let mut guard = pool.rx_pending.inner.lock();
1662 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1663 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1664 for desc in descs.iter() {
1665 let descriptor = pool.descriptors.borrow(desc);
1666 let offset =
1667 u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1668 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1669 assert_eq!(descriptor.head_length(), 0);
1670 assert_eq!(descriptor.tail_length(), 0);
1671 assert_eq!(descriptor.offset(), offset);
1672 assert_eq!(
1673 descriptor.data_length(),
1674 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1675 );
1676 }
1677 }
1678
1679 #[test]
1680 fn buffer_read_at_write_at() {
1681 let pool = Pool::new_test_default();
1682 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1683 let mut buffer =
1684 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1685 assert_eq!(buffer.parts.len(), 2);
1688 assert_eq!(buffer.cap(), alloc_bytes);
1689 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1690 buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1691 let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1692 buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1693 for (idx, byte) in read_buf.iter().enumerate() {
1694 assert_eq!(*byte, write_buf[idx]);
1695 }
1696 }
1697
1698 #[test]
1699 fn buffer_read_write_seek() {
1700 let pool = Pool::new_test_default();
1701 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1702 let mut buffer =
1703 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1704 assert_eq!(buffer.parts.len(), 2);
1707 assert_eq!(buffer.cap(), alloc_bytes);
1708 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1709 assert_eq!(
1710 buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1711 write_buf.len()
1712 );
1713 const SEEK_FROM_END: usize = 64;
1714 const READ_LEN: usize = 12;
1715 assert_eq!(
1716 buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1717 u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1718 );
1719 let mut read_buf = [0xff; READ_LEN];
1720 assert_eq!(
1721 buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1722 read_buf.len()
1723 );
1724 assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1725 }
1726
1727 #[test_case(32; "single buffer part")]
1728 #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1729 fn buffer_pad(pad_size: usize) {
1730 let mut pool = Pool::new_test_default();
1731 pool.set_min_tx_buffer_length(pad_size);
1732 for offset in 0..pad_size {
1733 Arc::get_mut(&mut pool)
1734 .expect("there are multiple owners of the underlying VMO")
1735 .fill_sentinel_bytes();
1736 let mut buffer =
1737 pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1738 buffer.check_write_and_pad(offset, pad_size);
1739 }
1740 }
1741
1742 #[test]
1743 fn buffer_pad_grow() {
1744 const BUFFER_PARTS: u8 = 3;
1745 let mut pool = Pool::new_test_default();
1746 let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1747 * u32::from(BUFFER_PARTS)
1748 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1749 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1750 pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1751 for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1752 Arc::get_mut(&mut pool)
1753 .expect("there are multiple owners of the underlying VMO")
1754 .fill_sentinel_bytes();
1755 let mut alloc =
1756 pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1757 alloc
1758 .init(
1759 DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1760 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1761 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1762 )
1763 .expect("head/body/tail sizes are representable with u16/u32/u16");
1764 let mut buffer = Buffer::try_from(alloc).unwrap();
1765 buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1766 }
1767 }
1768
1769 #[test_case( 0; "writes at the beginning")]
1770 #[test_case( 15; "writes in the first part")]
1771 #[test_case( 75; "writes in the second part")]
1772 #[test_case(135; "writes in the third part")]
1773 #[test_case(195; "writes in the last part")]
1774 fn buffer_used(write_offset: usize) {
1775 let pool = Pool::new_test_default();
1776 let mut buffer =
1777 pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1778 let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1779 if i == 0 {
1780 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1781 } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1782 DEFAULT_BUFFER_LENGTH.get()
1783 } else {
1784 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1785 }
1786 });
1787 assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1788 buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1789 assert_eq!(
1792 buffer.parts.iter().zip(expected_caps).fold(
1793 Some(write_offset),
1794 |offset, (part, expected_cap)| {
1795 assert_eq!(part.cap, expected_cap);
1797
1798 match offset {
1799 Some(offset) => {
1800 if offset >= expected_cap {
1801 assert_eq!(part.len, part.cap);
1803 Some(offset - part.len)
1804 } else {
1805 assert_eq!(part.len, offset + 1);
1807 let mut buf = [0];
1808 assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1810 assert_eq!(buf[0], WRITE_BYTE);
1811 None
1812 }
1813 }
1814 None => {
1815 assert_eq!(part.len, 0);
1817 None
1818 }
1819 }
1820 }
1821 ),
1822 None
1823 )
1824 }
1825
1826 #[test]
1827 fn buffer_commit() {
1828 let pool = Pool::new_test_default();
1829 for offset in 0..MAX_BUFFER_BYTES {
1830 let mut buffer = pool
1831 .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1832 .expect("failed to allocate buffer");
1833 buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1834 buffer.commit();
1835 for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1836 let head_length = descriptor.head_length();
1837 let tail_length = descriptor.tail_length();
1838 let data_length = descriptor.data_length();
1839 assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1840 assert_eq!(
1841 u32::from(head_length + tail_length) + data_length,
1842 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1843 );
1844 }
1845 }
1846 }
1847
1848 #[test]
1849 fn allocate_under_device_minimum() {
1850 const MIN_TX_DATA: usize = 32;
1851 const ALLOC_SIZE: usize = 16;
1852 const WRITE_BYTE: u8 = 0xff;
1853 const WRITE_SENTINAL_BYTE: u8 = 0xee;
1854 const READ_SENTINAL_BYTE: u8 = 0xdd;
1855 let mut config = DEFAULT_CONFIG;
1856 config.buffer_layout.min_tx_data = MIN_TX_DATA;
1857 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1858 for mut buffer in Vec::from_iter(std::iter::from_fn({
1859 let pool = pool.clone();
1860 move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1861 })) {
1862 buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1863 }
1864 let mut allocated =
1865 pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1866 assert_eq!(allocated.cap(), ALLOC_SIZE);
1867 const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1868 assert_matches!(
1869 allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1870 Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1871 );
1872 allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1873 assert_matches!(allocated.pad(), Ok(()));
1874 assert_eq!(allocated.cap(), MIN_TX_DATA);
1875 assert_eq!(allocated.len(), MIN_TX_DATA);
1876 const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1877 let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1878 assert_matches!(
1879 allocated.read_at(0, &mut read_buf[..]),
1880 Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1881 );
1882 allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1883 assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1884 assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1885 assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1886 }
1887
1888 #[test]
1889 fn invalid_tx_length() {
1890 let mut config = DEFAULT_CONFIG;
1891 config.buffer_layout.length = usize::from(u16::MAX) + 2;
1892 config.buffer_layout.min_tx_head = 0;
1893 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1894 assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1895 }
1896
1897 #[test]
1898 fn rx_leases() {
1899 let mut executor = fuchsia_async::TestExecutor::new();
1900 let state = RxLeaseHandlingState::new_with_enabled(true);
1901 let mut watcher = RxLeaseWatcher { state: &state };
1902
1903 {
1904 let mut fut = pin!(watcher.wait_until(0));
1905 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1906 }
1907 {
1908 state.rx_complete();
1909 let mut fut = pin!(watcher.wait_until(1));
1910 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1911 }
1912 {
1913 let mut fut = pin!(watcher.wait_until(0));
1914 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1915 }
1916 {
1917 let mut fut = pin!(watcher.wait_until(3));
1918 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1919 state.rx_complete();
1920 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1921 state.rx_complete();
1922 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1923 }
1924 let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1927 {
1928 let mut fut = pin!(watcher.wait_until(10000));
1929 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1930 }
1931 let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1932 assert_eq!(counter_before, counter_after);
1933 }
1934}