1use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::{ManuallyDrop, MaybeUninit};
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::atomic::{self, AtomicBool, AtomicU64};
19use std::sync::Arc;
20use std::task::Poll;
21
22use explicit::ResultExt as _;
23use fidl_fuchsia_hardware_network as netdev;
24use fuchsia_runtime::vmar_root_self;
25use futures::channel::oneshot::{channel, Receiver, Sender};
26use zx::AsHandleRef as _;
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32pub(in crate::session) struct Pool {
34 base: NonNull<u8>,
37 bytes: usize,
39 descriptors: Descriptors,
41 tx_alloc_state: Mutex<TxAllocState>,
43 pub(in crate::session) rx_pending: Pending<Rx>,
45 buffer_layout: BufferLayout,
47 rx_leases: RxLeaseHandlingState,
49}
50
51unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58struct TxAllocState {
60 requests: VecDeque<TxAllocReq>,
62 free_list: TxFreeList,
63}
64
65struct TxFreeList {
73 head: Option<DescId<Tx>>,
76 len: u16,
78}
79
80impl Pool {
81 pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86 let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87 config;
88 let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89 let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90 Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92 let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94 descriptors.borrow_mut(&mut curr).set_nxt(head);
95 Some(curr)
96 });
97
98 for rx_desc in rx_free.iter_mut() {
99 descriptors.borrow_mut(rx_desc).initialize(
100 ChainLength::ZERO,
101 0,
102 buffer_layout.length.try_into().unwrap(),
103 0,
104 );
105 }
106
107 let tx_alloc_state = TxAllocState {
108 free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109 requests: VecDeque::new(),
110 };
111
112 let size = buffer_stride.get() * u64::from(num_buffers);
113 let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115 const VMO_NAME: zx::Name =
116 const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117 data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118 let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123 let base = NonNull::new(
126 vmar_root_self()
127 .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128 .map_err(|status| Error::Map("data", status))? as *mut u8,
129 )
130 .unwrap();
131
132 Ok((
133 Arc::new(Pool {
134 base,
135 bytes: len,
136 descriptors,
137 tx_alloc_state: Mutex::new(tx_alloc_state),
138 rx_pending: Pending::new(rx_free),
139 buffer_layout,
140 rx_leases: RxLeaseHandlingState::new_with_flags(options),
141 }),
142 descriptors_vmo,
143 data_vmo,
144 ))
145 }
146
147 pub(in crate::session) async fn alloc_tx(
156 self: &Arc<Self>,
157 num_parts: ChainLength,
158 ) -> AllocGuard<Tx> {
159 let receiver = {
160 let mut state = self.tx_alloc_state.lock();
161 match state.free_list.try_alloc(num_parts, &self.descriptors) {
162 Some(allocated) => {
163 return AllocGuard::new(allocated, self.clone());
164 }
165 None => {
166 let (request, receiver) = TxAllocReq::new(num_parts);
167 state.requests.push_back(request);
168 receiver
169 }
170 }
171 };
172 receiver.await.unwrap()
174 }
175
176 pub(in crate::session) async fn alloc_tx_buffer(
183 self: &Arc<Self>,
184 num_bytes: usize,
185 ) -> Result<Buffer<Tx>> {
186 self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
187 }
188
189 pub(in crate::session) async fn alloc_tx_buffers<'a>(
202 self: &'a Arc<Self>,
203 num_bytes: usize,
204 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
205 let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
206 self.buffer_layout;
207 let tx_head = usize::from(min_tx_head);
208 let tx_tail = usize::from(min_tx_tail);
209 let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
210 let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
211 let chain_length = ChainLength::try_from(num_parts)?;
212 let first = self.alloc_tx(chain_length).await;
213 let iter = std::iter::once(first)
214 .chain(std::iter::from_fn(move || {
215 let mut state = self.tx_alloc_state.lock();
216 state
217 .free_list
218 .try_alloc(chain_length, &self.descriptors)
219 .map(|allocated| AllocGuard::new(allocated, self.clone()))
220 }))
221 .fuse()
224 .map(move |mut alloc| {
225 alloc.init(num_bytes)?;
226 Ok(alloc.into())
227 });
228 Ok(iter)
229 }
230
231 pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
233 self.rx_pending.extend(descs.into_iter().map(|mut desc| {
234 self.descriptors.borrow_mut(&mut desc).initialize(
235 ChainLength::ZERO,
236 0,
237 self.buffer_layout.length.try_into().unwrap(),
238 0,
239 );
240 desc
241 }));
242 }
243
244 fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
250 let free_impl = |free_list: &mut TxFreeList, chain: Chained<DescId<Tx>>| {
251 let mut descs = chain.into_iter();
252 free_list.len += u16::try_from(descs.len()).unwrap();
256 let head = descs.next();
257 let old_head = std::mem::replace(&mut free_list.head, head);
258 let mut tail = descs.last();
259 let mut tail_ref = self
260 .descriptors
261 .borrow_mut(tail.as_mut().unwrap_or_else(|| free_list.head.as_mut().unwrap()));
262 tail_ref.set_nxt(old_head);
263 };
264
265 let mut state = self.tx_alloc_state.lock();
266 let TxAllocState { requests, free_list } = &mut *state;
267 let () = free_impl(free_list, chain);
268
269 while let Some(req) = requests.front() {
272 match free_list.try_alloc(req.size, &self.descriptors) {
273 Some(descs) => {
274 match requests
276 .pop_front()
277 .unwrap()
278 .sender
279 .send(AllocGuard::new(descs, self.clone()))
280 .map_err(ManuallyDrop::new)
281 {
282 Ok(()) => {}
283 Err(mut alloc) => {
284 let AllocGuard { descs, pool } = alloc.deref_mut();
285 let () =
290 free_impl(free_list, std::mem::replace(descs, Chained::empty()));
291 let () = unsafe {
294 std::ptr::drop_in_place(pool);
295 };
296 }
297 }
298 }
299 None => {
300 if req.sender.is_canceled() {
301 let _cancelled: Option<TxAllocReq> = requests.pop_front();
302 continue;
303 } else {
304 break;
305 }
306 }
307 }
308 }
309 }
310
311 pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
315 let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
316 Ok(self.free_tx(chain))
317 }
318
319 pub(in crate::session) fn rx_completed(
325 self: &Arc<Self>,
326 head: DescId<Rx>,
327 ) -> Result<Buffer<Rx>> {
328 let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
329 let alloc = AllocGuard::new(descs, self.clone());
330 Ok(alloc.into())
331 }
332}
333
334impl Drop for Pool {
335 fn drop(&mut self) {
336 unsafe {
337 vmar_root_self()
338 .unmap(self.base.as_ptr() as usize, self.bytes)
339 .expect("failed to unmap VMO for Pool")
340 }
341 }
342}
343
344impl TxFreeList {
345 fn try_alloc(
349 &mut self,
350 num_parts: ChainLength,
351 descriptors: &Descriptors,
352 ) -> Option<Chained<DescId<Tx>>> {
353 if u16::from(num_parts.get()) > self.len {
354 return None;
355 }
356
357 let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
358 let new_head = self.head.as_ref().and_then(|head| {
359 let nxt = descriptors.borrow(head).nxt();
360 nxt.map(|id| unsafe {
361 DescId::from_raw(id)
364 })
365 });
366 std::mem::replace(&mut self.head, new_head)
367 });
368 let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
369 assert_eq!(allocated.len(), num_parts.into());
370 self.len -= u16::from(num_parts.get());
371 Some(allocated)
372 }
373}
374
375pub struct Buffer<K: AllocKind> {
380 alloc: AllocGuard<K>,
382 parts: Chained<BufferPart>,
384 pos: usize,
386}
387
388impl<K: AllocKind> Debug for Buffer<K> {
389 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390 let Self { alloc, parts, pos } = self;
391 f.debug_struct("Buffer")
392 .field("cap", &self.cap())
393 .field("alloc", alloc)
394 .field("parts", parts)
395 .field("pos", pos)
396 .finish()
397 }
398}
399
400impl<K: AllocKind> Buffer<K> {
401 pub fn cap(&self) -> usize {
403 self.parts.iter().fold(0, |acc, part| acc + part.cap)
404 }
405
406 pub fn len(&self) -> usize {
408 self.parts.iter().fold(0, |acc, part| acc + part.len)
409 }
410
411 pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
417 if self.cap() < offset + src.len() {
418 return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
419 }
420 let mut part_start = 0;
421 let mut total = 0;
422 for part in self.parts.iter_mut() {
423 if offset + total < part_start + part.cap {
424 let written = part.write_at(offset + total - part_start, &src[total..])?;
425 total += written;
426 if total == src.len() {
427 break;
428 }
429 } else {
430 part.len = part.cap;
431 }
432 part_start += part.cap;
433 }
434 assert_eq!(total, src.len());
435 Ok(())
436 }
437
438 pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
444 if self.len() < offset + dst.len() {
445 return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
446 }
447 let mut part_start = 0;
448 let mut total = 0;
449 for part in self.parts.iter() {
450 if offset + total < part_start + part.cap {
451 let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
452 total += read;
453 if total == dst.len() {
454 break;
455 }
456 }
457 part_start += part.cap;
458 }
459 assert_eq!(total, dst.len());
460 Ok(())
461 }
462
463 pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
465 match &mut (*self.parts)[..] {
466 [] => Some(&mut []),
467 [one] => Some(one.as_slice_mut()),
468 _ => None,
469 }
470 }
471
472 pub(in crate::session) fn pad(&mut self) -> Result<()> {
474 let num_parts = self.parts.len();
475 let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
476 self.alloc.pool.buffer_layout;
477 let mut target = min_tx_data;
478 for (i, part) in self.parts.iter_mut().enumerate() {
479 let grow_cap = if i == num_parts - 1 {
480 let descriptor =
481 self.alloc.descriptors().last().expect("descriptor must not be empty");
482 let data_length = descriptor.data_length();
483 let tail_length = descriptor.tail_length();
484 let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
486 match rest.checked_sub(usize::from(min_tx_tail)) {
487 Some(grow_cap) => Some(grow_cap),
488 None => break,
489 }
490 } else {
491 None
492 };
493 target -= part.pad(target, grow_cap)?;
494 }
495 if target != 0 {
496 return Err(Error::Pad(min_tx_data, self.cap()));
497 }
498 Ok(())
499 }
500
501 pub(in crate::session) fn leak(mut self) -> DescId<K> {
505 let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
506 descs.into_iter().next().unwrap()
507 }
508
509 pub fn frame_type(&self) -> Result<netdev::FrameType> {
511 self.alloc.descriptor().frame_type()
512 }
513
514 pub fn port(&self) -> Port {
516 self.alloc.descriptor().port()
517 }
518}
519
520impl Buffer<Tx> {
521 pub(in crate::session) fn commit(&mut self) {
523 for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
524 descriptor.commit(u32::try_from(part.len).unwrap())
527 }
528 }
529
530 pub fn set_port(&mut self, port: Port) {
532 self.alloc.descriptor_mut().set_port(port)
533 }
534
535 pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
537 self.alloc.descriptor_mut().set_frame_type(frame_type)
538 }
539
540 pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
542 self.alloc.descriptor_mut().set_tx_flags(flags)
543 }
544}
545
546impl Buffer<Rx> {
547 pub async fn into_tx(self) -> Buffer<Tx> {
549 let Buffer { alloc, parts, pos } = self;
550 Buffer { alloc: alloc.into_tx().await, parts, pos }
551 }
552
553 pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
555 self.alloc.descriptor().rx_flags()
556 }
557}
558
559impl AllocGuard<Rx> {
560 async fn into_tx(mut self) -> AllocGuard<Tx> {
566 let mut tx = self.pool.alloc_tx(self.descs.len).await;
567 std::mem::swap(&mut self.descs.storage, unsafe {
573 std::mem::transmute(&mut tx.descs.storage)
574 });
575 tx
576 }
577}
578
579struct Chained<T> {
581 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
582 len: ChainLength,
583}
584
585impl<T> Deref for Chained<T> {
586 type Target = [T];
587
588 fn deref(&self) -> &Self::Target {
589 unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
591 }
592}
593
594impl<T> DerefMut for Chained<T> {
595 fn deref_mut(&mut self) -> &mut Self::Target {
596 unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
598 }
599}
600
601impl<T> Drop for Chained<T> {
602 fn drop(&mut self) {
603 unsafe {
605 std::ptr::drop_in_place(self.deref_mut());
606 }
607 }
608}
609
610impl<T: Debug> Debug for Chained<T> {
611 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612 f.debug_list().entries(self.iter()).finish()
613 }
614}
615
616impl<T> Chained<T> {
617 #[allow(clippy::uninit_assumed_init)]
618 fn empty() -> Self {
619 Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
626 }
627}
628
629impl<T> FromIterator<T> for Chained<T> {
630 fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
635 let mut result = Self::empty();
636 let mut len = 0u8;
637 for (idx, e) in elements.into_iter().enumerate() {
638 result.storage[idx] = MaybeUninit::new(e);
639 len += 1;
640 }
641 assert!(len > 0);
642 result.len = ChainLength::try_from(len).unwrap();
645 result
646 }
647}
648
649impl<T> IntoIterator for Chained<T> {
650 type Item = T;
651 type IntoIter = ChainedIter<T>;
652
653 fn into_iter(mut self) -> Self::IntoIter {
654 let len = self.len;
655 self.len = ChainLength::ZERO;
656 #[allow(clippy::uninit_assumed_init)]
663 let storage =
664 std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
665 ChainedIter { storage, len, consumed: 0 }
666 }
667}
668
669struct ChainedIter<T> {
670 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
671 len: ChainLength,
672 consumed: u8,
673}
674
675impl<T> Iterator for ChainedIter<T> {
676 type Item = T;
677
678 fn next(&mut self) -> Option<Self::Item> {
679 if self.consumed < self.len.get() {
680 let value = unsafe {
683 std::mem::replace(
684 &mut self.storage[usize::from(self.consumed)],
685 MaybeUninit::uninit(),
686 )
687 .assume_init()
688 };
689 self.consumed += 1;
690 Some(value)
691 } else {
692 None
693 }
694 }
695
696 fn size_hint(&self) -> (usize, Option<usize>) {
697 let len = usize::from(self.len.get() - self.consumed);
698 (len, Some(len))
699 }
700}
701
702impl<T> ExactSizeIterator for ChainedIter<T> {}
703
704impl<T> Drop for ChainedIter<T> {
705 fn drop(&mut self) {
706 unsafe {
708 std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
709 &mut self.storage[self.consumed.into()..self.len.into()],
710 ));
711 }
712 }
713}
714
715pub(in crate::session) struct AllocGuard<K: AllocKind> {
717 descs: Chained<DescId<K>>,
718 pool: Arc<Pool>,
719}
720
721impl<K: AllocKind> Debug for AllocGuard<K> {
722 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
723 let Self { descs, pool: _ } = self;
724 f.debug_struct("AllocGuard").field("descs", descs).finish()
725 }
726}
727
728impl<K: AllocKind> AllocGuard<K> {
729 fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
730 Self { descs, pool }
731 }
732
733 fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
735 self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
736 }
737
738 fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
740 let descriptors = &self.pool.descriptors;
741 self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
742 }
743
744 fn descriptor(&self) -> DescRef<'_, K> {
746 self.descriptors().next().expect("descriptors must not be empty")
747 }
748
749 fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
751 self.descriptors_mut().next().expect("descriptors must not be empty")
752 }
753}
754
755impl AllocGuard<Tx> {
756 fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
758 let len = self.len();
759 let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
760 self.pool.buffer_layout;
761 for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
762 let chain_length = ChainLength::try_from(clen).unwrap();
763 let head_length = if clen + 1 == len { min_tx_head } else { 0 };
764 let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
765
766 let available_bytes =
769 u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
770 .unwrap();
771
772 let data_length = match u32::try_from(requested_bytes) {
773 Ok(requested) => {
774 if requested < available_bytes {
775 tail_length = u16::try_from(available_bytes - requested)
779 .ok_checked::<TryFromIntError>()
780 .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
781 .ok_or(Error::TxLength)?;
782 }
783 requested.min(available_bytes)
784 }
785 Err(TryFromIntError { .. }) => available_bytes,
786 };
787
788 requested_bytes -=
789 usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
790 panic!(
791 "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
792 data_length, requested_bytes
793 )
794 });
795 descriptor.initialize(chain_length, head_length, data_length, tail_length);
796 }
797 assert_eq!(requested_bytes, 0);
798 Ok(())
799 }
800}
801
802impl<K: AllocKind> Drop for AllocGuard<K> {
803 fn drop(&mut self) {
804 if self.is_empty() {
805 return;
806 }
807 K::free(private::Allocation(self));
808 }
809}
810
811impl<K: AllocKind> Deref for AllocGuard<K> {
812 type Target = [DescId<K>];
813
814 fn deref(&self) -> &Self::Target {
815 self.descs.deref()
816 }
817}
818
819struct BufferPart {
823 ptr: *mut u8,
825 cap: usize,
827 len: usize,
831}
832
833impl BufferPart {
834 unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
843 Self { ptr, cap, len }
844 }
845
846 fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
852 let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
853 let to_copy = std::cmp::min(available, dst.len());
854 unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
857 Ok(to_copy)
858 }
859
860 fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
866 let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
867 let to_copy = std::cmp::min(src.len(), available);
868 unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
871 self.len = std::cmp::max(self.len, offset + to_copy);
872 Ok(to_copy)
873 }
874
875 fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
882 if target <= self.len {
883 return Ok(target);
884 }
885 if let Some(limit) = limit {
886 if target > limit {
887 return Err(Error::Pad(target, self.cap));
888 }
889 if self.cap < target {
890 self.cap = target
891 }
892 }
893 let new_len = std::cmp::min(target, self.cap);
894 unsafe {
897 std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
898 }
899 self.len = new_len;
900 Ok(new_len)
901 }
902
903 fn as_slice_mut(&mut self) -> &mut [u8] {
905 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
909 }
910}
911
912unsafe impl Send for BufferPart {}
916
917impl Debug for BufferPart {
918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919 let BufferPart { len, cap, ptr } = &self;
920 f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
921 }
922}
923
924impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
925 fn from(alloc: AllocGuard<K>) -> Self {
926 let AllocGuard { pool, descs: _ } = &alloc;
927 let parts: Chained<BufferPart> = alloc
928 .descriptors()
929 .map(|descriptor| {
930 let offset = usize::try_from(descriptor.offset()).unwrap();
933 let head_length = usize::from(descriptor.head_length());
934 let data_length = usize::try_from(descriptor.data_length()).unwrap();
935 let len = match K::REFL {
936 AllocKindRefl::Tx => 0,
937 AllocKindRefl::Rx => data_length,
938 };
939 assert!(
941 offset + head_length <= pool.bytes,
942 "buffer part starts beyond the end of pool"
943 );
944 assert!(
945 offset + head_length + data_length <= pool.bytes,
946 "buffer part ends beyond the end of pool"
947 );
948 unsafe {
954 BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
955 }
956 })
957 .collect();
958 Self { alloc, parts, pos: 0 }
959 }
960}
961
962impl<K: AllocKind> Read for Buffer<K> {
963 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
964 self.read_at(self.pos, buf)
965 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
966 self.pos += buf.len();
967 Ok(buf.len())
968 }
969}
970
971impl Write for Buffer<Tx> {
972 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
973 self.write_at(self.pos, buf)
974 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
975 self.pos += buf.len();
976 Ok(buf.len())
977 }
978
979 fn flush(&mut self) -> std::io::Result<()> {
980 Ok(())
981 }
982}
983
984impl<K: AllocKind> Seek for Buffer<K> {
985 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
986 let pos = match pos {
987 SeekFrom::Start(pos) => pos,
988 SeekFrom::End(offset) => {
989 let end = i64::try_from(self.cap())
990 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
991 u64::try_from(end.wrapping_add(offset)).unwrap()
992 }
993 SeekFrom::Current(offset) => {
994 let current = i64::try_from(self.pos)
995 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
996 u64::try_from(current.wrapping_add(offset)).unwrap()
997 }
998 };
999 self.pos =
1000 usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1001 Ok(pos)
1002 }
1003}
1004
1005struct TxAllocReq {
1007 sender: Sender<AllocGuard<Tx>>,
1008 size: ChainLength,
1009}
1010
1011impl TxAllocReq {
1012 fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1013 let (sender, receiver) = channel();
1014 (TxAllocReq { sender, size }, receiver)
1015 }
1016}
1017
1018mod private {
1021 use super::{AllocKind, Rx, Tx};
1022 pub trait Sealed: 'static + Sized {}
1023 impl Sealed for Rx {}
1024 impl Sealed for Tx {}
1025
1026 pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1030}
1031
1032pub trait AllocKind: private::Sealed {
1035 const REFL: AllocKindRefl;
1037
1038 fn free(alloc: private::Allocation<'_, Self>);
1040}
1041
1042pub enum Tx {}
1044pub enum Rx {}
1046
1047pub enum AllocKindRefl {
1049 Tx,
1050 Rx,
1051}
1052
1053impl AllocKindRefl {
1054 pub(in crate::session) fn as_str(&self) -> &'static str {
1055 match self {
1056 AllocKindRefl::Tx => "Tx",
1057 AllocKindRefl::Rx => "Rx",
1058 }
1059 }
1060}
1061
1062impl AllocKind for Tx {
1063 const REFL: AllocKindRefl = AllocKindRefl::Tx;
1064
1065 fn free(alloc: private::Allocation<'_, Self>) {
1066 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1067 pool.free_tx(std::mem::replace(descs, Chained::empty()));
1068 }
1069}
1070
1071impl AllocKind for Rx {
1072 const REFL: AllocKindRefl = AllocKindRefl::Rx;
1073
1074 fn free(alloc: private::Allocation<'_, Self>) {
1075 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1076 pool.free_rx(std::mem::replace(descs, Chained::empty()));
1077 pool.rx_leases.rx_complete();
1078 }
1079}
1080
1081pub(in crate::session) struct RxLeaseHandlingState {
1083 can_watch_rx_leases: AtomicBool,
1084 rx_frame_counter: AtomicU64,
1094 rx_lease_waker: AtomicWaker,
1095}
1096
1097impl RxLeaseHandlingState {
1098 fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1099 Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1100 }
1101
1102 fn new_with_enabled(enabled: bool) -> Self {
1103 Self {
1104 can_watch_rx_leases: AtomicBool::new(enabled),
1105 rx_frame_counter: AtomicU64::new(0),
1106 rx_lease_waker: AtomicWaker::new(),
1107 }
1108 }
1109
1110 fn rx_complete(&self) {
1113 let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1114 let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1115
1116 if prev == u64::MAX {
1119 rx_lease_waker.wake();
1120 }
1121 }
1122}
1123
1124pub(in crate::session) trait RxLeaseHandlingStateContainer {
1127 fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1128}
1129
1130impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1131 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1132 self.borrow()
1133 }
1134}
1135
1136impl RxLeaseHandlingStateContainer for Arc<Pool> {
1137 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1138 &self.rx_leases
1139 }
1140}
1141
1142pub(in crate::session) struct RxLeaseWatcher<T> {
1144 state: T,
1145}
1146
1147impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1148 pub(in crate::session) fn new(state: T) -> Self {
1155 assert!(
1156 state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1157 "can't watch rx leases"
1158 );
1159 Self { state }
1160 }
1161
1162 pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1171 let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1180 self.state.lease_handling_state();
1181
1182 let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1183 let _guard = scopeguard::guard((), |()| {
1186 let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1187 });
1188
1189 if prev >= hold_until_frame {
1191 return;
1192 }
1193 let threshold = prev.wrapping_sub(hold_until_frame);
1196 futures::future::poll_fn(|cx| {
1197 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1198 if v < threshold {
1199 return Poll::Ready(());
1200 }
1201 rx_lease_waker.register(cx.waker());
1202 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1203 if v < threshold {
1204 return Poll::Ready(());
1205 }
1206 Poll::Pending
1207 })
1208 .await;
1209 }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214 use super::*;
1215
1216 use assert_matches::assert_matches;
1217 use fuchsia_async as fasync;
1218 use futures::future::FutureExt;
1219 use test_case::test_case;
1220
1221 use std::collections::HashSet;
1222 use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1223 use std::pin::pin;
1224 use std::task::{Poll, Waker};
1225
1226 const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1227 const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1228 const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1230 const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1231 const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1232 const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1233 * netdev::MAX_DESCRIPTOR_CHAIN as usize
1234 - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1235 - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1236
1237 const SENTINEL_BYTE: u8 = 0xab;
1238 const WRITE_BYTE: u8 = 1;
1239 const PAD_BYTE: u8 = 0;
1240
1241 const DEFAULT_CONFIG: Config = Config {
1242 buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1243 num_rx_buffers: DEFAULT_RX_BUFFERS,
1244 num_tx_buffers: DEFAULT_TX_BUFFERS,
1245 options: netdev::SessionFlags::empty(),
1246 buffer_layout: BufferLayout {
1247 length: DEFAULT_BUFFER_LENGTH.get(),
1248 min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1249 min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1250 min_tx_data: 0,
1251 },
1252 };
1253
1254 impl Pool {
1255 fn new_test_default() -> Arc<Self> {
1256 let (pool, _descriptors, _data) =
1257 Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1258 pool
1259 }
1260
1261 async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1262 self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1263 .await
1264 }
1265
1266 fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1267 self.alloc_tx_checked(n).now_or_never()
1268 }
1269
1270 fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1271 std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1272 }
1273
1274 fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1275 self.alloc_tx_buffer(num_bytes)
1276 .now_or_never()
1277 .transpose()
1278 .expect("invalid arguments for alloc_tx_buffer")
1279 }
1280
1281 fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1282 Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1283 }
1284
1285 fn fill_sentinel_bytes(&mut self) {
1286 unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1289 }
1290 }
1291
1292 impl Buffer<Tx> {
1293 fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1297 self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1298 self.pad().expect("failed to pad");
1299 assert_eq!(self.len(), pad_size);
1300 const INIT_BYTE: u8 = 42;
1303 let mut read_buf = vec![INIT_BYTE; pad_size];
1304 self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1305 for (idx, byte) in read_buf.iter().enumerate() {
1306 if idx < offset {
1307 assert_eq!(*byte, SENTINEL_BYTE);
1308 } else if idx == offset {
1309 assert_eq!(*byte, WRITE_BYTE);
1310 } else {
1311 assert_eq!(*byte, PAD_BYTE);
1312 }
1313 }
1314 }
1315 }
1316
1317 impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1318 where
1319 K: AllocKind,
1320 I: ExactSizeIterator<Item = u16>,
1321 T: Copy + IntoIterator<IntoIter = I>,
1322 {
1323 fn eq(&self, other: &T) -> bool {
1324 let iter = other.into_iter();
1325 if usize::from(self.len) != iter.len() {
1326 return false;
1327 }
1328 self.iter().zip(iter).all(|(l, r)| l.get() == r)
1329 }
1330 }
1331
1332 impl Debug for TxAllocReq {
1333 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334 let TxAllocReq { sender: _, size } = self;
1335 f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1336 }
1337 }
1338
1339 #[test]
1340 fn alloc_tx_distinct() {
1341 let pool = Pool::new_test_default();
1342 let allocated = pool.alloc_tx_all(1);
1343 assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1344 let distinct = allocated
1345 .iter()
1346 .map(|alloc| {
1347 assert_eq!(alloc.descs.len(), 1);
1348 alloc.descs[0].get()
1349 })
1350 .collect::<HashSet<u16>>();
1351 assert_eq!(allocated.len(), distinct.len());
1352 }
1353
1354 #[test]
1355 fn alloc_tx_free_len() {
1356 let pool = Pool::new_test_default();
1357 {
1358 let allocated = pool.alloc_tx_all(2);
1359 assert_eq!(
1360 allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1361 DEFAULT_TX_BUFFERS.get().into()
1362 );
1363 assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1364 }
1365 assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1366 }
1367
1368 #[test]
1369 fn alloc_tx_chain() {
1370 let pool = Pool::new_test_default();
1371 let allocated = pool.alloc_tx_all(3);
1372 assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1373 assert_matches!(pool.alloc_tx_now_or_never(3), None);
1374 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1375 }
1376
1377 #[test]
1378 fn alloc_tx_many() {
1379 let pool = Pool::new_test_default();
1380 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1381 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1382 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1383 let data_len = usize::try_from(data_len).unwrap();
1384 let mut buffers = pool
1385 .alloc_tx_buffers(data_len)
1386 .now_or_never()
1387 .expect("failed to alloc")
1388 .unwrap()
1389 .collect::<Result<Vec<_>>>()
1392 .expect("buffer error");
1393 assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1394
1395 assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1398
1399 assert_matches!(buffers.pop(), Some(_));
1401 let mut more_buffers =
1402 pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1403 let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1404 assert_matches!(more_buffers.next(), None);
1405 drop(buffer);
1408 assert_matches!(more_buffers.next(), None);
1409 }
1410
1411 #[test]
1412 fn alloc_tx_after_free() {
1413 let pool = Pool::new_test_default();
1414 let mut allocated = pool.alloc_tx_all(1);
1415 assert_matches!(pool.alloc_tx_now_or_never(2), None);
1416 {
1417 let _drained = allocated.drain(..2);
1418 }
1419 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1420 }
1421
1422 #[test]
1423 fn blocking_alloc_tx() {
1424 let mut executor = fasync::TestExecutor::new();
1425 let pool = Pool::new_test_default();
1426 let mut allocated = pool.alloc_tx_all(1);
1427 let alloc_fut = pool.alloc_tx_checked(1);
1428 let mut alloc_fut = pin!(alloc_fut);
1429 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1431 assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1433 let freed = allocated
1434 .pop()
1435 .expect("no fulfulled allocations")
1436 .iter()
1437 .map(|x| x.get())
1438 .collect::<Chained<_>>();
1439 let same_as_freed =
1440 |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1441 assert_matches!(
1443 &executor.run_until_stalled(&mut alloc_fut),
1444 Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1445 );
1446 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1448 }
1449
1450 #[test]
1451 fn blocking_alloc_tx_cancel_before_free() {
1452 let mut executor = fasync::TestExecutor::new();
1453 let pool = Pool::new_test_default();
1454 let mut allocated = pool.alloc_tx_all(1);
1455 {
1456 let alloc_fut = pool.alloc_tx_checked(1);
1457 let mut alloc_fut = pin!(alloc_fut);
1458 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1459 assert_matches!(
1460 pool.tx_alloc_state.lock().requests.as_slices(),
1461 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1462 );
1463 }
1464 assert_matches!(
1465 allocated.pop(),
1466 Some(AllocGuard { ref descs, pool: ref p })
1467 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1468 );
1469 let state = pool.tx_alloc_state.lock();
1470 assert_eq!(state.free_list.len, 1);
1471 assert!(state.requests.is_empty());
1472 }
1473
1474 #[test]
1475 fn blocking_alloc_tx_cancel_after_free() {
1476 let mut executor = fasync::TestExecutor::new();
1477 let pool = Pool::new_test_default();
1478 let mut allocated = pool.alloc_tx_all(1);
1479 {
1480 let alloc_fut = pool.alloc_tx_checked(1);
1481 let mut alloc_fut = pin!(alloc_fut);
1482 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1483 assert_matches!(
1484 pool.tx_alloc_state.lock().requests.as_slices(),
1485 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1486 );
1487 assert_matches!(
1488 allocated.pop(),
1489 Some(AllocGuard { ref descs, pool: ref p })
1490 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1491 );
1492 }
1493 let state = pool.tx_alloc_state.lock();
1494 assert_eq!(state.free_list.len, 1);
1495 assert!(state.requests.is_empty());
1496 }
1497
1498 #[test]
1499 fn multiple_blocking_alloc_tx_fulfill_order() {
1500 const TASKS_TOTAL: usize = 3;
1501 let mut executor = fasync::TestExecutor::new();
1502 let pool = Pool::new_test_default();
1503 let mut allocated = pool.alloc_tx_all(1);
1504 let mut alloc_futs = (1..=TASKS_TOTAL)
1505 .rev()
1506 .map(|x| {
1507 let pool = pool.clone();
1508 (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1509 })
1510 .collect::<Vec<_>>();
1511
1512 for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1513 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1514 assert_eq!(idx + *req_size, TASKS_TOTAL);
1516 }
1517 {
1518 let state = pool.tx_alloc_state.lock();
1519 assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1521 let mut requests = state.requests.iter();
1522 assert!(requests.next().unwrap().sender.is_canceled());
1525 assert!(requests.all(|req| !req.sender.is_canceled()))
1527 }
1528
1529 let mut to_free = Vec::new();
1530 let mut freed = 0;
1531 for free_size in (1..=TASKS_TOTAL).rev() {
1532 let (_req_size, mut task) = alloc_futs.remove(0);
1533 for _ in 1..free_size {
1534 freed += 1;
1535 assert_matches!(
1536 allocated.pop(),
1537 Some(AllocGuard { ref descs, pool: ref p })
1538 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1539 );
1540 assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1541 }
1542 freed += 1;
1543 assert_matches!(
1544 allocated.pop(),
1545 Some(AllocGuard { ref descs, pool: ref p })
1546 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1547 );
1548 match executor.run_until_stalled(&mut task) {
1549 Poll::Ready(alloc) => {
1550 assert_eq!(alloc.len(), free_size);
1551 to_free.push(alloc);
1553 }
1554 Poll::Pending => panic!("The request should be fulfilled"),
1555 }
1556 for (_req_size, task) in alloc_futs.iter_mut() {
1558 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1559 }
1560 }
1561 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1562 }
1563
1564 #[test]
1565 fn singleton_tx_layout() {
1566 let pool = Pool::new_test_default();
1567 let buffers = std::iter::from_fn(|| {
1568 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1569 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1570 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1571 pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1572 assert_eq!(buffer.alloc.descriptors().count(), 1);
1573 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1574 * u64::from(buffer.alloc[0].get());
1575 {
1576 let descriptor = buffer.alloc.descriptor();
1577 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1578 assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1579 assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1580 assert_eq!(descriptor.data_length(), data_len);
1581 assert_eq!(descriptor.offset(), offset);
1582 }
1583
1584 assert_eq!(buffer.parts.len(), 1);
1585 let BufferPart { ptr, len, cap } = buffer.parts[0];
1586 assert_eq!(len, 0);
1587 assert_eq!(
1588 pool.base.as_ptr().wrapping_add(
1591 usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1592 ),
1593 ptr
1594 );
1595 assert_eq!(data_len, u32::try_from(cap).unwrap());
1596 buffer
1597 })
1598 })
1599 .collect::<Vec<_>>();
1600 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1601 }
1602
1603 #[test]
1604 fn chained_tx_layout() {
1605 let pool = Pool::new_test_default();
1606 let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1607 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1608 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1609 let buffers = std::iter::from_fn(|| {
1610 pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1611 assert_eq!(buffer.parts.len(), 4);
1612 for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1613 let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1614 let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1615 let tail_length = if chain_length == ChainLength::ZERO {
1616 DEFAULT_MIN_TX_BUFFER_TAIL
1617 } else {
1618 0
1619 };
1620 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1621 - u32::from(head_length)
1622 - u32::from(tail_length);
1623 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1624 * u64::from(buffer.alloc[idx].get());
1625 assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1626 assert_eq!(descriptor.head_length(), head_length);
1627 assert_eq!(descriptor.tail_length(), tail_length);
1628 assert_eq!(descriptor.offset(), offset);
1629 assert_eq!(descriptor.data_length(), data_len);
1630 if chain_length != ChainLength::ZERO {
1631 assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1632 }
1633
1634 let BufferPart { ptr, cap, len } = buffer.parts[idx];
1635 assert_eq!(len, 0);
1636 assert_eq!(
1637 pool.base.as_ptr().wrapping_add(
1640 usize::try_from(offset).unwrap() + usize::from(head_length),
1641 ),
1642 ptr
1643 );
1644 assert_eq!(data_len, u32::try_from(cap).unwrap());
1645 }
1646 buffer
1647 })
1648 })
1649 .collect::<Vec<_>>();
1650 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1651 }
1652
1653 #[test]
1654 fn rx_distinct() {
1655 let pool = Pool::new_test_default();
1656 let mut guard = pool.rx_pending.inner.lock();
1657 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1658 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1659 let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1660 assert_eq!(descs.len(), distinct.len());
1661 }
1662
1663 #[test]
1664 fn alloc_rx_layout() {
1665 let pool = Pool::new_test_default();
1666 let mut guard = pool.rx_pending.inner.lock();
1667 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1668 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1669 for desc in descs.iter() {
1670 let descriptor = pool.descriptors.borrow(desc);
1671 let offset =
1672 u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1673 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1674 assert_eq!(descriptor.head_length(), 0);
1675 assert_eq!(descriptor.tail_length(), 0);
1676 assert_eq!(descriptor.offset(), offset);
1677 assert_eq!(
1678 descriptor.data_length(),
1679 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1680 );
1681 }
1682 }
1683
1684 #[test]
1685 fn buffer_read_at_write_at() {
1686 let pool = Pool::new_test_default();
1687 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1688 let mut buffer =
1689 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1690 assert_eq!(buffer.parts.len(), 2);
1693 assert_eq!(buffer.cap(), alloc_bytes);
1694 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1695 buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1696 let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1697 buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1698 for (idx, byte) in read_buf.iter().enumerate() {
1699 assert_eq!(*byte, write_buf[idx]);
1700 }
1701 }
1702
1703 #[test]
1704 fn buffer_read_write_seek() {
1705 let pool = Pool::new_test_default();
1706 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1707 let mut buffer =
1708 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1709 assert_eq!(buffer.parts.len(), 2);
1712 assert_eq!(buffer.cap(), alloc_bytes);
1713 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1714 assert_eq!(
1715 buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1716 write_buf.len()
1717 );
1718 const SEEK_FROM_END: usize = 64;
1719 const READ_LEN: usize = 12;
1720 assert_eq!(
1721 buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1722 u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1723 );
1724 let mut read_buf = [0xff; READ_LEN];
1725 assert_eq!(
1726 buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1727 read_buf.len()
1728 );
1729 assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1730 }
1731
1732 #[test_case(32; "single buffer part")]
1733 #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1734 fn buffer_pad(pad_size: usize) {
1735 let mut pool = Pool::new_test_default();
1736 pool.set_min_tx_buffer_length(pad_size);
1737 for offset in 0..pad_size {
1738 Arc::get_mut(&mut pool)
1739 .expect("there are multiple owners of the underlying VMO")
1740 .fill_sentinel_bytes();
1741 let mut buffer =
1742 pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1743 buffer.check_write_and_pad(offset, pad_size);
1744 }
1745 }
1746
1747 #[test]
1748 fn buffer_pad_grow() {
1749 const BUFFER_PARTS: u8 = 3;
1750 let mut pool = Pool::new_test_default();
1751 let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1752 * u32::from(BUFFER_PARTS)
1753 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1754 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1755 pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1756 for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1757 Arc::get_mut(&mut pool)
1758 .expect("there are multiple owners of the underlying VMO")
1759 .fill_sentinel_bytes();
1760 let mut alloc =
1761 pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1762 alloc
1763 .init(
1764 DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1765 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1766 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1767 )
1768 .expect("head/body/tail sizes are representable with u16/u32/u16");
1769 let mut buffer = Buffer::try_from(alloc).unwrap();
1770 buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1771 }
1772 }
1773
1774 #[test_case( 0; "writes at the beginning")]
1775 #[test_case( 15; "writes in the first part")]
1776 #[test_case( 75; "writes in the second part")]
1777 #[test_case(135; "writes in the third part")]
1778 #[test_case(195; "writes in the last part")]
1779 fn buffer_used(write_offset: usize) {
1780 let pool = Pool::new_test_default();
1781 let mut buffer =
1782 pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1783 let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1784 if i == 0 {
1785 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1786 } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1787 DEFAULT_BUFFER_LENGTH.get()
1788 } else {
1789 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1790 }
1791 });
1792 assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1793 buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1794 assert_eq!(
1797 buffer.parts.iter().zip(expected_caps).fold(
1798 Some(write_offset),
1799 |offset, (part, expected_cap)| {
1800 assert_eq!(part.cap, expected_cap);
1802
1803 match offset {
1804 Some(offset) => {
1805 if offset >= expected_cap {
1806 assert_eq!(part.len, part.cap);
1808 Some(offset - part.len)
1809 } else {
1810 assert_eq!(part.len, offset + 1);
1812 let mut buf = [0];
1813 assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1815 assert_eq!(buf[0], WRITE_BYTE);
1816 None
1817 }
1818 }
1819 None => {
1820 assert_eq!(part.len, 0);
1822 None
1823 }
1824 }
1825 }
1826 ),
1827 None
1828 )
1829 }
1830
1831 #[test]
1832 fn buffer_commit() {
1833 let pool = Pool::new_test_default();
1834 for offset in 0..MAX_BUFFER_BYTES {
1835 let mut buffer = pool
1836 .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1837 .expect("failed to allocate buffer");
1838 buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1839 buffer.commit();
1840 for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1841 let head_length = descriptor.head_length();
1842 let tail_length = descriptor.tail_length();
1843 let data_length = descriptor.data_length();
1844 assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1845 assert_eq!(
1846 u32::from(head_length + tail_length) + data_length,
1847 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1848 );
1849 }
1850 }
1851 }
1852
1853 #[test]
1854 fn allocate_under_device_minimum() {
1855 const MIN_TX_DATA: usize = 32;
1856 const ALLOC_SIZE: usize = 16;
1857 const WRITE_BYTE: u8 = 0xff;
1858 const WRITE_SENTINAL_BYTE: u8 = 0xee;
1859 const READ_SENTINAL_BYTE: u8 = 0xdd;
1860 let mut config = DEFAULT_CONFIG;
1861 config.buffer_layout.min_tx_data = MIN_TX_DATA;
1862 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1863 for mut buffer in Vec::from_iter(std::iter::from_fn({
1864 let pool = pool.clone();
1865 move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1866 })) {
1867 buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1868 }
1869 let mut allocated =
1870 pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1871 assert_eq!(allocated.cap(), ALLOC_SIZE);
1872 const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1873 assert_matches!(
1874 allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1875 Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1876 );
1877 allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1878 assert_matches!(allocated.pad(), Ok(()));
1879 assert_eq!(allocated.cap(), MIN_TX_DATA);
1880 assert_eq!(allocated.len(), MIN_TX_DATA);
1881 const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1882 let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1883 assert_matches!(
1884 allocated.read_at(0, &mut read_buf[..]),
1885 Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1886 );
1887 allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1888 assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1889 assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1890 assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1891 }
1892
1893 #[test]
1894 fn invalid_tx_length() {
1895 let mut config = DEFAULT_CONFIG;
1896 config.buffer_layout.length = usize::from(u16::MAX) + 2;
1897 config.buffer_layout.min_tx_head = 0;
1898 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1899 assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1900 }
1901
1902 #[test]
1903 fn rx_leases() {
1904 let mut executor = fuchsia_async::TestExecutor::new();
1905 let state = RxLeaseHandlingState::new_with_enabled(true);
1906 let mut watcher = RxLeaseWatcher { state: &state };
1907
1908 {
1909 let mut fut = pin!(watcher.wait_until(0));
1910 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1911 }
1912 {
1913 state.rx_complete();
1914 let mut fut = pin!(watcher.wait_until(1));
1915 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1916 }
1917 {
1918 let mut fut = pin!(watcher.wait_until(0));
1919 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1920 }
1921 {
1922 let mut fut = pin!(watcher.wait_until(3));
1923 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1924 state.rx_complete();
1925 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1926 state.rx_complete();
1927 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1928 }
1929 let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1932 {
1933 let mut fut = pin!(watcher.wait_until(10000));
1934 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1935 }
1936 let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1937 assert_eq!(counter_before, counter_after);
1938 }
1939}