1use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27use zx::AsHandleRef as _;
28
29use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
30use crate::error::{Error, Result};
31use crate::session::{BufferLayout, Config, Pending, Port};
32
33pub(in crate::session) struct Pool {
35 base: NonNull<u8>,
38 bytes: usize,
40 descriptors: Descriptors,
42 tx_alloc_state: Mutex<TxAllocState>,
44 pub(in crate::session) rx_pending: Pending<Rx>,
46 buffer_layout: BufferLayout,
48 rx_leases: RxLeaseHandlingState,
50}
51
52unsafe impl Send for Pool {}
57unsafe impl Sync for Pool {}
58
59struct TxAllocState {
61 requests: VecDeque<TxAllocReq>,
63 free_list: TxFreeList,
64}
65
66struct TxFreeList {
74 head: Option<DescId<Tx>>,
77 len: u16,
79}
80
81impl Pool {
82 pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
87 let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
88 config;
89 let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
90 let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
91 Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
92
93 let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
95 descriptors.borrow_mut(&mut curr).set_nxt(head);
96 Some(curr)
97 });
98
99 for rx_desc in rx_free.iter_mut() {
100 descriptors.borrow_mut(rx_desc).initialize(
101 ChainLength::ZERO,
102 0,
103 buffer_layout.length.try_into().unwrap(),
104 0,
105 );
106 }
107
108 let tx_alloc_state = TxAllocState {
109 free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
110 requests: VecDeque::new(),
111 };
112
113 let size = buffer_stride.get() * u64::from(num_buffers);
114 let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
115
116 const VMO_NAME: zx::Name =
117 const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
118 data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
119 let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
124 let base = NonNull::new(
127 vmar_root_self()
128 .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
129 .map_err(|status| Error::Map("data", status))? as *mut u8,
130 )
131 .unwrap();
132
133 Ok((
134 Arc::new(Pool {
135 base,
136 bytes: len,
137 descriptors,
138 tx_alloc_state: Mutex::new(tx_alloc_state),
139 rx_pending: Pending::new(rx_free),
140 buffer_layout,
141 rx_leases: RxLeaseHandlingState::new_with_flags(options),
142 }),
143 descriptors_vmo,
144 data_vmo,
145 ))
146 }
147
148 pub(in crate::session) async fn alloc_tx(
157 self: &Arc<Self>,
158 num_parts: ChainLength,
159 ) -> AllocGuard<Tx> {
160 let receiver = {
161 let mut state = self.tx_alloc_state.lock();
162 match state.free_list.try_alloc(num_parts, &self.descriptors) {
163 Some(allocated) => {
164 return AllocGuard::new(allocated, self.clone());
165 }
166 None => {
167 let (request, receiver) = TxAllocReq::new(num_parts);
168 state.requests.push_back(request);
169 receiver
170 }
171 }
172 };
173 receiver.await.unwrap()
175 }
176
177 pub(in crate::session) async fn alloc_tx_buffer(
184 self: &Arc<Self>,
185 num_bytes: usize,
186 ) -> Result<Buffer<Tx>> {
187 self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
188 }
189
190 pub(in crate::session) async fn alloc_tx_buffers<'a>(
203 self: &'a Arc<Self>,
204 num_bytes: usize,
205 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
206 let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
207 self.buffer_layout;
208 let tx_head = usize::from(min_tx_head);
209 let tx_tail = usize::from(min_tx_tail);
210 let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
211 let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
212 let chain_length = ChainLength::try_from(num_parts)?;
213 let first = self.alloc_tx(chain_length).await;
214 let iter = std::iter::once(first)
215 .chain(std::iter::from_fn(move || {
216 let mut state = self.tx_alloc_state.lock();
217 state
218 .free_list
219 .try_alloc(chain_length, &self.descriptors)
220 .map(|allocated| AllocGuard::new(allocated, self.clone()))
221 }))
222 .fuse()
225 .map(move |mut alloc| {
226 alloc.init(num_bytes)?;
227 Ok(alloc.into())
228 });
229 Ok(iter)
230 }
231
232 pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
234 self.rx_pending.extend(descs.into_iter().map(|mut desc| {
235 self.descriptors.borrow_mut(&mut desc).initialize(
236 ChainLength::ZERO,
237 0,
238 self.buffer_layout.length.try_into().unwrap(),
239 0,
240 );
241 desc
242 }));
243 }
244
245 fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
251 let mut to_fulfill = ArrayVec::<
255 (TxAllocReq, AllocGuard<Tx>),
256 { netdev::MAX_DESCRIPTOR_CHAIN as usize },
257 >::new();
258
259 let mut state = self.tx_alloc_state.lock();
260
261 {
262 let mut descs = chain.into_iter();
263 state.free_list.len += u16::try_from(descs.len()).unwrap();
267 let head = descs.next();
268 let old_head = std::mem::replace(&mut state.free_list.head, head);
269 let mut tail = descs.last();
270 let mut tail_ref = self.descriptors.borrow_mut(
271 tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
272 );
273 tail_ref.set_nxt(old_head);
274 }
275
276 while let Some(req) = state.requests.front() {
279 if req.sender.is_canceled() {
284 let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
285 continue;
286 }
287 let size = req.size;
288 match state.free_list.try_alloc(size, &self.descriptors) {
289 Some(descs) => {
290 let req = state.requests.pop_front().unwrap();
292 to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
293
294 if to_fulfill.is_full() {
298 drop(state);
299 for (req, alloc) in to_fulfill.drain(..) {
300 req.fulfill(alloc)
301 }
302 state = self.tx_alloc_state.lock();
303 }
304 }
305 None => break,
306 }
307 }
308
309 drop(state);
311 for (req, alloc) in to_fulfill {
313 req.fulfill(alloc)
314 }
315 }
316
317 pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
321 let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
322 Ok(self.free_tx(chain))
323 }
324
325 pub(in crate::session) fn rx_completed(
331 self: &Arc<Self>,
332 head: DescId<Rx>,
333 ) -> Result<Buffer<Rx>> {
334 let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
335 let alloc = AllocGuard::new(descs, self.clone());
336 Ok(alloc.into())
337 }
338}
339
340impl Drop for Pool {
341 fn drop(&mut self) {
342 unsafe {
343 vmar_root_self()
344 .unmap(self.base.as_ptr() as usize, self.bytes)
345 .expect("failed to unmap VMO for Pool")
346 }
347 }
348}
349
350impl TxFreeList {
351 fn try_alloc(
355 &mut self,
356 num_parts: ChainLength,
357 descriptors: &Descriptors,
358 ) -> Option<Chained<DescId<Tx>>> {
359 if u16::from(num_parts.get()) > self.len {
360 return None;
361 }
362
363 let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
364 let new_head = self.head.as_ref().and_then(|head| {
365 let nxt = descriptors.borrow(head).nxt();
366 nxt.map(|id| unsafe {
367 DescId::from_raw(id)
370 })
371 });
372 std::mem::replace(&mut self.head, new_head)
373 });
374 let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
375 assert_eq!(allocated.len(), num_parts.into());
376 self.len -= u16::from(num_parts.get());
377 Some(allocated)
378 }
379}
380
381pub struct Buffer<K: AllocKind> {
386 alloc: AllocGuard<K>,
388 parts: Chained<BufferPart>,
390 pos: usize,
392}
393
394impl<K: AllocKind> Debug for Buffer<K> {
395 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396 let Self { alloc, parts, pos } = self;
397 f.debug_struct("Buffer")
398 .field("cap", &self.cap())
399 .field("alloc", alloc)
400 .field("parts", parts)
401 .field("pos", pos)
402 .finish()
403 }
404}
405
406impl<K: AllocKind> Buffer<K> {
407 pub fn cap(&self) -> usize {
409 self.parts.iter().fold(0, |acc, part| acc + part.cap)
410 }
411
412 pub fn len(&self) -> usize {
414 self.parts.iter().fold(0, |acc, part| acc + part.len)
415 }
416
417 pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
423 if self.cap() < offset + src.len() {
424 return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
425 }
426 let mut part_start = 0;
427 let mut total = 0;
428 for part in self.parts.iter_mut() {
429 if offset + total < part_start + part.cap {
430 let written = part.write_at(offset + total - part_start, &src[total..])?;
431 total += written;
432 if total == src.len() {
433 break;
434 }
435 } else {
436 part.len = part.cap;
437 }
438 part_start += part.cap;
439 }
440 assert_eq!(total, src.len());
441 Ok(())
442 }
443
444 pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
450 if self.len() < offset + dst.len() {
451 return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
452 }
453 let mut part_start = 0;
454 let mut total = 0;
455 for part in self.parts.iter() {
456 if offset + total < part_start + part.cap {
457 let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
458 total += read;
459 if total == dst.len() {
460 break;
461 }
462 }
463 part_start += part.cap;
464 }
465 assert_eq!(total, dst.len());
466 Ok(())
467 }
468
469 pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
471 match &mut (*self.parts)[..] {
472 [] => Some(&mut []),
473 [one] => Some(one.as_slice_mut()),
474 _ => None,
475 }
476 }
477
478 pub(in crate::session) fn pad(&mut self) -> Result<()> {
480 let num_parts = self.parts.len();
481 let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
482 self.alloc.pool.buffer_layout;
483 let mut target = min_tx_data;
484 for (i, part) in self.parts.iter_mut().enumerate() {
485 let grow_cap = if i == num_parts - 1 {
486 let descriptor =
487 self.alloc.descriptors().last().expect("descriptor must not be empty");
488 let data_length = descriptor.data_length();
489 let tail_length = descriptor.tail_length();
490 let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
492 match rest.checked_sub(usize::from(min_tx_tail)) {
493 Some(grow_cap) => Some(grow_cap),
494 None => break,
495 }
496 } else {
497 None
498 };
499 target -= part.pad(target, grow_cap)?;
500 }
501 if target != 0 {
502 return Err(Error::Pad(min_tx_data, self.cap()));
503 }
504 Ok(())
505 }
506
507 pub(in crate::session) fn leak(mut self) -> DescId<K> {
511 let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
512 descs.into_iter().next().unwrap()
513 }
514
515 pub fn frame_type(&self) -> Result<netdev::FrameType> {
517 self.alloc.descriptor().frame_type()
518 }
519
520 pub fn port(&self) -> Port {
522 self.alloc.descriptor().port()
523 }
524}
525
526impl Buffer<Tx> {
527 pub(in crate::session) fn commit(&mut self) {
529 for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
530 descriptor.commit(u32::try_from(part.len).unwrap())
533 }
534 }
535
536 pub fn set_port(&mut self, port: Port) {
538 self.alloc.descriptor_mut().set_port(port)
539 }
540
541 pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
543 self.alloc.descriptor_mut().set_frame_type(frame_type)
544 }
545
546 pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
548 self.alloc.descriptor_mut().set_tx_flags(flags)
549 }
550}
551
552impl Buffer<Rx> {
553 pub async fn into_tx(self) -> Buffer<Tx> {
555 let Buffer { alloc, parts, pos } = self;
556 Buffer { alloc: alloc.into_tx().await, parts, pos }
557 }
558
559 pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
561 self.alloc.descriptor().rx_flags()
562 }
563}
564
565impl AllocGuard<Rx> {
566 async fn into_tx(mut self) -> AllocGuard<Tx> {
572 let mut tx = self.pool.alloc_tx(self.descs.len).await;
573 std::mem::swap(&mut self.descs.storage, unsafe {
579 std::mem::transmute(&mut tx.descs.storage)
580 });
581 tx
582 }
583}
584
585struct Chained<T> {
587 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
588 len: ChainLength,
589}
590
591impl<T> Deref for Chained<T> {
592 type Target = [T];
593
594 fn deref(&self) -> &Self::Target {
595 unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
597 }
598}
599
600impl<T> DerefMut for Chained<T> {
601 fn deref_mut(&mut self) -> &mut Self::Target {
602 unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
604 }
605}
606
607impl<T> Drop for Chained<T> {
608 fn drop(&mut self) {
609 unsafe {
611 std::ptr::drop_in_place(self.deref_mut());
612 }
613 }
614}
615
616impl<T: Debug> Debug for Chained<T> {
617 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618 f.debug_list().entries(self.iter()).finish()
619 }
620}
621
622impl<T> Chained<T> {
623 #[allow(clippy::uninit_assumed_init)]
624 fn empty() -> Self {
625 Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
632 }
633}
634
635impl<T> FromIterator<T> for Chained<T> {
636 fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
641 let mut result = Self::empty();
642 let mut len = 0u8;
643 for (idx, e) in elements.into_iter().enumerate() {
644 result.storage[idx] = MaybeUninit::new(e);
645 len += 1;
646 }
647 assert!(len > 0);
648 result.len = ChainLength::try_from(len).unwrap();
651 result
652 }
653}
654
655impl<T> IntoIterator for Chained<T> {
656 type Item = T;
657 type IntoIter = ChainedIter<T>;
658
659 fn into_iter(mut self) -> Self::IntoIter {
660 let len = self.len;
661 self.len = ChainLength::ZERO;
662 #[allow(clippy::uninit_assumed_init)]
669 let storage =
670 std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
671 ChainedIter { storage, len, consumed: 0 }
672 }
673}
674
675struct ChainedIter<T> {
676 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
677 len: ChainLength,
678 consumed: u8,
679}
680
681impl<T> Iterator for ChainedIter<T> {
682 type Item = T;
683
684 fn next(&mut self) -> Option<Self::Item> {
685 if self.consumed < self.len.get() {
686 let value = unsafe {
689 std::mem::replace(
690 &mut self.storage[usize::from(self.consumed)],
691 MaybeUninit::uninit(),
692 )
693 .assume_init()
694 };
695 self.consumed += 1;
696 Some(value)
697 } else {
698 None
699 }
700 }
701
702 fn size_hint(&self) -> (usize, Option<usize>) {
703 let len = usize::from(self.len.get() - self.consumed);
704 (len, Some(len))
705 }
706}
707
708impl<T> ExactSizeIterator for ChainedIter<T> {}
709
710impl<T> Drop for ChainedIter<T> {
711 fn drop(&mut self) {
712 unsafe {
714 std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
715 &mut self.storage[self.consumed.into()..self.len.into()],
716 ));
717 }
718 }
719}
720
721pub(in crate::session) struct AllocGuard<K: AllocKind> {
723 descs: Chained<DescId<K>>,
724 pool: Arc<Pool>,
725}
726
727impl<K: AllocKind> Debug for AllocGuard<K> {
728 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729 let Self { descs, pool: _ } = self;
730 f.debug_struct("AllocGuard").field("descs", descs).finish()
731 }
732}
733
734impl<K: AllocKind> AllocGuard<K> {
735 fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
736 Self { descs, pool }
737 }
738
739 fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
741 self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
742 }
743
744 fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
746 let descriptors = &self.pool.descriptors;
747 self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
748 }
749
750 fn descriptor(&self) -> DescRef<'_, K> {
752 self.descriptors().next().expect("descriptors must not be empty")
753 }
754
755 fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
757 self.descriptors_mut().next().expect("descriptors must not be empty")
758 }
759}
760
761impl AllocGuard<Tx> {
762 fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
764 let len = self.len();
765 let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
766 self.pool.buffer_layout;
767 for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
768 let chain_length = ChainLength::try_from(clen).unwrap();
769 let head_length = if clen + 1 == len { min_tx_head } else { 0 };
770 let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
771
772 let available_bytes =
775 u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
776 .unwrap();
777
778 let data_length = match u32::try_from(requested_bytes) {
779 Ok(requested) => {
780 if requested < available_bytes {
781 tail_length = u16::try_from(available_bytes - requested)
785 .ok_checked::<TryFromIntError>()
786 .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
787 .ok_or(Error::TxLength)?;
788 }
789 requested.min(available_bytes)
790 }
791 Err(TryFromIntError { .. }) => available_bytes,
792 };
793
794 requested_bytes -=
795 usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
796 panic!(
797 "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
798 data_length, requested_bytes
799 )
800 });
801 descriptor.initialize(chain_length, head_length, data_length, tail_length);
802 }
803 assert_eq!(requested_bytes, 0);
804 Ok(())
805 }
806}
807
808impl<K: AllocKind> Drop for AllocGuard<K> {
809 fn drop(&mut self) {
810 if self.is_empty() {
811 return;
812 }
813 K::free(private::Allocation(self));
814 }
815}
816
817impl<K: AllocKind> Deref for AllocGuard<K> {
818 type Target = [DescId<K>];
819
820 fn deref(&self) -> &Self::Target {
821 self.descs.deref()
822 }
823}
824
825struct BufferPart {
829 ptr: *mut u8,
831 cap: usize,
833 len: usize,
837}
838
839impl BufferPart {
840 unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
849 Self { ptr, cap, len }
850 }
851
852 fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
858 let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
859 let to_copy = std::cmp::min(available, dst.len());
860 unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
863 Ok(to_copy)
864 }
865
866 fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
872 let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
873 let to_copy = std::cmp::min(src.len(), available);
874 unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
877 self.len = std::cmp::max(self.len, offset + to_copy);
878 Ok(to_copy)
879 }
880
881 fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
888 if target <= self.len {
889 return Ok(target);
890 }
891 if let Some(limit) = limit {
892 if target > limit {
893 return Err(Error::Pad(target, self.cap));
894 }
895 if self.cap < target {
896 self.cap = target
897 }
898 }
899 let new_len = std::cmp::min(target, self.cap);
900 unsafe {
903 std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
904 }
905 self.len = new_len;
906 Ok(new_len)
907 }
908
909 fn as_slice_mut(&mut self) -> &mut [u8] {
911 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
915 }
916}
917
918unsafe impl Send for BufferPart {}
922
923impl Debug for BufferPart {
924 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
925 let BufferPart { len, cap, ptr } = &self;
926 f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
927 }
928}
929
930impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
931 fn from(alloc: AllocGuard<K>) -> Self {
932 let AllocGuard { pool, descs: _ } = &alloc;
933 let parts: Chained<BufferPart> = alloc
934 .descriptors()
935 .map(|descriptor| {
936 let offset = usize::try_from(descriptor.offset()).unwrap();
939 let head_length = usize::from(descriptor.head_length());
940 let data_length = usize::try_from(descriptor.data_length()).unwrap();
941 let len = match K::REFL {
942 AllocKindRefl::Tx => 0,
943 AllocKindRefl::Rx => data_length,
944 };
945 assert!(
947 offset + head_length <= pool.bytes,
948 "buffer part starts beyond the end of pool"
949 );
950 assert!(
951 offset + head_length + data_length <= pool.bytes,
952 "buffer part ends beyond the end of pool"
953 );
954 unsafe {
960 BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
961 }
962 })
963 .collect();
964 Self { alloc, parts, pos: 0 }
965 }
966}
967
968impl<K: AllocKind> Read for Buffer<K> {
969 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
970 self.read_at(self.pos, buf)
971 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
972 self.pos += buf.len();
973 Ok(buf.len())
974 }
975}
976
977impl Write for Buffer<Tx> {
978 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
979 self.write_at(self.pos, buf)
980 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
981 self.pos += buf.len();
982 Ok(buf.len())
983 }
984
985 fn flush(&mut self) -> std::io::Result<()> {
986 Ok(())
987 }
988}
989
990impl<K: AllocKind> Seek for Buffer<K> {
991 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
992 let pos = match pos {
993 SeekFrom::Start(pos) => pos,
994 SeekFrom::End(offset) => {
995 let end = i64::try_from(self.cap())
996 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
997 u64::try_from(end.wrapping_add(offset)).unwrap()
998 }
999 SeekFrom::Current(offset) => {
1000 let current = i64::try_from(self.pos)
1001 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1002 u64::try_from(current.wrapping_add(offset)).unwrap()
1003 }
1004 };
1005 self.pos =
1006 usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1007 Ok(pos)
1008 }
1009}
1010
1011struct TxAllocReq {
1013 sender: Sender<AllocGuard<Tx>>,
1014 size: ChainLength,
1015}
1016
1017impl TxAllocReq {
1018 fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1019 let (sender, receiver) = channel();
1020 (TxAllocReq { sender, size }, receiver)
1021 }
1022
1023 fn fulfill(self, guard: AllocGuard<Tx>) {
1031 let Self { sender, size: _ } = self;
1032 match sender.send(guard) {
1033 Ok(()) => (),
1034 Err(guard) => {
1035 drop(guard);
1038 }
1039 }
1040 }
1041}
1042
1043mod private {
1046 use super::{AllocKind, Rx, Tx};
1047 pub trait Sealed: 'static + Sized {}
1048 impl Sealed for Rx {}
1049 impl Sealed for Tx {}
1050
1051 pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1055}
1056
1057pub trait AllocKind: private::Sealed {
1060 const REFL: AllocKindRefl;
1062
1063 fn free(alloc: private::Allocation<'_, Self>);
1065}
1066
1067pub enum Tx {}
1069pub enum Rx {}
1071
1072pub enum AllocKindRefl {
1074 Tx,
1075 Rx,
1076}
1077
1078impl AllocKindRefl {
1079 pub(in crate::session) fn as_str(&self) -> &'static str {
1080 match self {
1081 AllocKindRefl::Tx => "Tx",
1082 AllocKindRefl::Rx => "Rx",
1083 }
1084 }
1085}
1086
1087impl AllocKind for Tx {
1088 const REFL: AllocKindRefl = AllocKindRefl::Tx;
1089
1090 fn free(alloc: private::Allocation<'_, Self>) {
1091 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1092 pool.free_tx(std::mem::replace(descs, Chained::empty()));
1093 }
1094}
1095
1096impl AllocKind for Rx {
1097 const REFL: AllocKindRefl = AllocKindRefl::Rx;
1098
1099 fn free(alloc: private::Allocation<'_, Self>) {
1100 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1101 pool.free_rx(std::mem::replace(descs, Chained::empty()));
1102 pool.rx_leases.rx_complete();
1103 }
1104}
1105
1106pub(in crate::session) struct RxLeaseHandlingState {
1108 can_watch_rx_leases: AtomicBool,
1109 rx_frame_counter: AtomicU64,
1119 rx_lease_waker: AtomicWaker,
1120}
1121
1122impl RxLeaseHandlingState {
1123 fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1124 Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1125 }
1126
1127 fn new_with_enabled(enabled: bool) -> Self {
1128 Self {
1129 can_watch_rx_leases: AtomicBool::new(enabled),
1130 rx_frame_counter: AtomicU64::new(0),
1131 rx_lease_waker: AtomicWaker::new(),
1132 }
1133 }
1134
1135 fn rx_complete(&self) {
1138 let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1139 let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1140
1141 if prev == u64::MAX {
1144 rx_lease_waker.wake();
1145 }
1146 }
1147}
1148
1149pub(in crate::session) trait RxLeaseHandlingStateContainer {
1152 fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1153}
1154
1155impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1156 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1157 self.borrow()
1158 }
1159}
1160
1161impl RxLeaseHandlingStateContainer for Arc<Pool> {
1162 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1163 &self.rx_leases
1164 }
1165}
1166
1167pub(in crate::session) struct RxLeaseWatcher<T> {
1169 state: T,
1170}
1171
1172impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1173 pub(in crate::session) fn new(state: T) -> Self {
1180 assert!(
1181 state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1182 "can't watch rx leases"
1183 );
1184 Self { state }
1185 }
1186
1187 pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1196 let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1205 self.state.lease_handling_state();
1206
1207 let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1208 let _guard = scopeguard::guard((), |()| {
1211 let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1212 });
1213
1214 if prev >= hold_until_frame {
1216 return;
1217 }
1218 let threshold = prev.wrapping_sub(hold_until_frame);
1221 futures::future::poll_fn(|cx| {
1222 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1223 if v < threshold {
1224 return Poll::Ready(());
1225 }
1226 rx_lease_waker.register(cx.waker());
1227 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1228 if v < threshold {
1229 return Poll::Ready(());
1230 }
1231 Poll::Pending
1232 })
1233 .await;
1234 }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239 use super::*;
1240
1241 use assert_matches::assert_matches;
1242 use fuchsia_async as fasync;
1243 use futures::future::FutureExt;
1244 use test_case::test_case;
1245
1246 use std::collections::HashSet;
1247 use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1248 use std::pin::pin;
1249 use std::task::{Poll, Waker};
1250
1251 const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1252 const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1253 const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1255 const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1256 const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1257 const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1258 * netdev::MAX_DESCRIPTOR_CHAIN as usize
1259 - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1260 - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1261
1262 const SENTINEL_BYTE: u8 = 0xab;
1263 const WRITE_BYTE: u8 = 1;
1264 const PAD_BYTE: u8 = 0;
1265
1266 const DEFAULT_CONFIG: Config = Config {
1267 buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1268 num_rx_buffers: DEFAULT_RX_BUFFERS,
1269 num_tx_buffers: DEFAULT_TX_BUFFERS,
1270 options: netdev::SessionFlags::empty(),
1271 buffer_layout: BufferLayout {
1272 length: DEFAULT_BUFFER_LENGTH.get(),
1273 min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1274 min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1275 min_tx_data: 0,
1276 },
1277 };
1278
1279 impl Pool {
1280 fn new_test_default() -> Arc<Self> {
1281 let (pool, _descriptors, _data) =
1282 Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1283 pool
1284 }
1285
1286 async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1287 self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1288 .await
1289 }
1290
1291 fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1292 self.alloc_tx_checked(n).now_or_never()
1293 }
1294
1295 fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1296 std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1297 }
1298
1299 fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1300 self.alloc_tx_buffer(num_bytes)
1301 .now_or_never()
1302 .transpose()
1303 .expect("invalid arguments for alloc_tx_buffer")
1304 }
1305
1306 fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1307 Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1308 }
1309
1310 fn fill_sentinel_bytes(&mut self) {
1311 unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1314 }
1315 }
1316
1317 impl Buffer<Tx> {
1318 fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1322 self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1323 self.pad().expect("failed to pad");
1324 assert_eq!(self.len(), pad_size);
1325 const INIT_BYTE: u8 = 42;
1328 let mut read_buf = vec![INIT_BYTE; pad_size];
1329 self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1330 for (idx, byte) in read_buf.iter().enumerate() {
1331 if idx < offset {
1332 assert_eq!(*byte, SENTINEL_BYTE);
1333 } else if idx == offset {
1334 assert_eq!(*byte, WRITE_BYTE);
1335 } else {
1336 assert_eq!(*byte, PAD_BYTE);
1337 }
1338 }
1339 }
1340 }
1341
1342 impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1343 where
1344 K: AllocKind,
1345 I: ExactSizeIterator<Item = u16>,
1346 T: Copy + IntoIterator<IntoIter = I>,
1347 {
1348 fn eq(&self, other: &T) -> bool {
1349 let iter = other.into_iter();
1350 if usize::from(self.len) != iter.len() {
1351 return false;
1352 }
1353 self.iter().zip(iter).all(|(l, r)| l.get() == r)
1354 }
1355 }
1356
1357 impl Debug for TxAllocReq {
1358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1359 let TxAllocReq { sender: _, size } = self;
1360 f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1361 }
1362 }
1363
1364 #[test]
1365 fn alloc_tx_distinct() {
1366 let pool = Pool::new_test_default();
1367 let allocated = pool.alloc_tx_all(1);
1368 assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1369 let distinct = allocated
1370 .iter()
1371 .map(|alloc| {
1372 assert_eq!(alloc.descs.len(), 1);
1373 alloc.descs[0].get()
1374 })
1375 .collect::<HashSet<u16>>();
1376 assert_eq!(allocated.len(), distinct.len());
1377 }
1378
1379 #[test]
1380 fn alloc_tx_free_len() {
1381 let pool = Pool::new_test_default();
1382 {
1383 let allocated = pool.alloc_tx_all(2);
1384 assert_eq!(
1385 allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1386 DEFAULT_TX_BUFFERS.get().into()
1387 );
1388 assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1389 }
1390 assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1391 }
1392
1393 #[test]
1394 fn alloc_tx_chain() {
1395 let pool = Pool::new_test_default();
1396 let allocated = pool.alloc_tx_all(3);
1397 assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1398 assert_matches!(pool.alloc_tx_now_or_never(3), None);
1399 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1400 }
1401
1402 #[test]
1403 fn alloc_tx_many() {
1404 let pool = Pool::new_test_default();
1405 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1406 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1407 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1408 let data_len = usize::try_from(data_len).unwrap();
1409 let mut buffers = pool
1410 .alloc_tx_buffers(data_len)
1411 .now_or_never()
1412 .expect("failed to alloc")
1413 .unwrap()
1414 .collect::<Result<Vec<_>>>()
1417 .expect("buffer error");
1418 assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1419
1420 assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1423
1424 assert_matches!(buffers.pop(), Some(_));
1426 let mut more_buffers =
1427 pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1428 let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1429 assert_matches!(more_buffers.next(), None);
1430 drop(buffer);
1433 assert_matches!(more_buffers.next(), None);
1434 }
1435
1436 #[test]
1437 fn alloc_tx_after_free() {
1438 let pool = Pool::new_test_default();
1439 let mut allocated = pool.alloc_tx_all(1);
1440 assert_matches!(pool.alloc_tx_now_or_never(2), None);
1441 {
1442 let _drained = allocated.drain(..2);
1443 }
1444 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1445 }
1446
1447 #[test]
1448 fn blocking_alloc_tx() {
1449 let mut executor = fasync::TestExecutor::new();
1450 let pool = Pool::new_test_default();
1451 let mut allocated = pool.alloc_tx_all(1);
1452 let alloc_fut = pool.alloc_tx_checked(1);
1453 let mut alloc_fut = pin!(alloc_fut);
1454 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1456 assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1458 let freed = allocated
1459 .pop()
1460 .expect("no fulfulled allocations")
1461 .iter()
1462 .map(|x| x.get())
1463 .collect::<Chained<_>>();
1464 let same_as_freed =
1465 |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1466 assert_matches!(
1468 &executor.run_until_stalled(&mut alloc_fut),
1469 Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1470 );
1471 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1473 }
1474
1475 #[test]
1476 fn blocking_alloc_tx_cancel_before_free() {
1477 let mut executor = fasync::TestExecutor::new();
1478 let pool = Pool::new_test_default();
1479 let mut allocated = pool.alloc_tx_all(1);
1480 {
1481 let alloc_fut = pool.alloc_tx_checked(1);
1482 let mut alloc_fut = pin!(alloc_fut);
1483 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1484 assert_matches!(
1485 pool.tx_alloc_state.lock().requests.as_slices(),
1486 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1487 );
1488 }
1489 assert_matches!(
1490 allocated.pop(),
1491 Some(AllocGuard { ref descs, pool: ref p })
1492 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1493 );
1494 let state = pool.tx_alloc_state.lock();
1495 assert_eq!(state.free_list.len, 1);
1496 assert!(state.requests.is_empty());
1497 }
1498
1499 #[test]
1500 fn blocking_alloc_tx_cancel_after_free() {
1501 let mut executor = fasync::TestExecutor::new();
1502 let pool = Pool::new_test_default();
1503 let mut allocated = pool.alloc_tx_all(1);
1504 {
1505 let alloc_fut = pool.alloc_tx_checked(1);
1506 let mut alloc_fut = pin!(alloc_fut);
1507 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1508 assert_matches!(
1509 pool.tx_alloc_state.lock().requests.as_slices(),
1510 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1511 );
1512 assert_matches!(
1513 allocated.pop(),
1514 Some(AllocGuard { ref descs, pool: ref p })
1515 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1516 );
1517 }
1518 let state = pool.tx_alloc_state.lock();
1519 assert_eq!(state.free_list.len, 1);
1520 assert!(state.requests.is_empty());
1521 }
1522
1523 #[test]
1524 fn multiple_blocking_alloc_tx_fulfill_order() {
1525 const TASKS_TOTAL: usize = 3;
1526 let mut executor = fasync::TestExecutor::new();
1527 let pool = Pool::new_test_default();
1528 let mut allocated = pool.alloc_tx_all(1);
1529 let mut alloc_futs = (1..=TASKS_TOTAL)
1530 .rev()
1531 .map(|x| {
1532 let pool = pool.clone();
1533 (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1534 })
1535 .collect::<Vec<_>>();
1536
1537 for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1538 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1539 assert_eq!(idx + *req_size, TASKS_TOTAL);
1541 }
1542 {
1543 let state = pool.tx_alloc_state.lock();
1544 assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1546 let mut requests = state.requests.iter();
1547 assert!(requests.next().unwrap().sender.is_canceled());
1550 assert!(requests.all(|req| !req.sender.is_canceled()))
1552 }
1553
1554 let mut to_free = Vec::new();
1555 let mut freed = 0;
1556 for free_size in (1..=TASKS_TOTAL).rev() {
1557 let (_req_size, mut task) = alloc_futs.remove(0);
1558 for _ in 1..free_size {
1559 freed += 1;
1560 assert_matches!(
1561 allocated.pop(),
1562 Some(AllocGuard { ref descs, pool: ref p })
1563 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1564 );
1565 assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1566 }
1567 freed += 1;
1568 assert_matches!(
1569 allocated.pop(),
1570 Some(AllocGuard { ref descs, pool: ref p })
1571 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1572 );
1573 match executor.run_until_stalled(&mut task) {
1574 Poll::Ready(alloc) => {
1575 assert_eq!(alloc.len(), free_size);
1576 to_free.push(alloc);
1578 }
1579 Poll::Pending => panic!("The request should be fulfilled"),
1580 }
1581 for (_req_size, task) in alloc_futs.iter_mut() {
1583 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1584 }
1585 }
1586 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1587 }
1588
1589 #[test]
1590 fn singleton_tx_layout() {
1591 let pool = Pool::new_test_default();
1592 let buffers = std::iter::from_fn(|| {
1593 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1594 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1595 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1596 pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1597 assert_eq!(buffer.alloc.descriptors().count(), 1);
1598 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1599 * u64::from(buffer.alloc[0].get());
1600 {
1601 let descriptor = buffer.alloc.descriptor();
1602 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1603 assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1604 assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1605 assert_eq!(descriptor.data_length(), data_len);
1606 assert_eq!(descriptor.offset(), offset);
1607 }
1608
1609 assert_eq!(buffer.parts.len(), 1);
1610 let BufferPart { ptr, len, cap } = buffer.parts[0];
1611 assert_eq!(len, 0);
1612 assert_eq!(
1613 pool.base.as_ptr().wrapping_add(
1616 usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1617 ),
1618 ptr
1619 );
1620 assert_eq!(data_len, u32::try_from(cap).unwrap());
1621 buffer
1622 })
1623 })
1624 .collect::<Vec<_>>();
1625 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1626 }
1627
1628 #[test]
1629 fn chained_tx_layout() {
1630 let pool = Pool::new_test_default();
1631 let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1632 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1633 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1634 let buffers = std::iter::from_fn(|| {
1635 pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1636 assert_eq!(buffer.parts.len(), 4);
1637 for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1638 let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1639 let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1640 let tail_length = if chain_length == ChainLength::ZERO {
1641 DEFAULT_MIN_TX_BUFFER_TAIL
1642 } else {
1643 0
1644 };
1645 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1646 - u32::from(head_length)
1647 - u32::from(tail_length);
1648 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1649 * u64::from(buffer.alloc[idx].get());
1650 assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1651 assert_eq!(descriptor.head_length(), head_length);
1652 assert_eq!(descriptor.tail_length(), tail_length);
1653 assert_eq!(descriptor.offset(), offset);
1654 assert_eq!(descriptor.data_length(), data_len);
1655 if chain_length != ChainLength::ZERO {
1656 assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1657 }
1658
1659 let BufferPart { ptr, cap, len } = buffer.parts[idx];
1660 assert_eq!(len, 0);
1661 assert_eq!(
1662 pool.base.as_ptr().wrapping_add(
1665 usize::try_from(offset).unwrap() + usize::from(head_length),
1666 ),
1667 ptr
1668 );
1669 assert_eq!(data_len, u32::try_from(cap).unwrap());
1670 }
1671 buffer
1672 })
1673 })
1674 .collect::<Vec<_>>();
1675 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1676 }
1677
1678 #[test]
1679 fn rx_distinct() {
1680 let pool = Pool::new_test_default();
1681 let mut guard = pool.rx_pending.inner.lock();
1682 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1683 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1684 let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1685 assert_eq!(descs.len(), distinct.len());
1686 }
1687
1688 #[test]
1689 fn alloc_rx_layout() {
1690 let pool = Pool::new_test_default();
1691 let mut guard = pool.rx_pending.inner.lock();
1692 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1693 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1694 for desc in descs.iter() {
1695 let descriptor = pool.descriptors.borrow(desc);
1696 let offset =
1697 u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1698 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1699 assert_eq!(descriptor.head_length(), 0);
1700 assert_eq!(descriptor.tail_length(), 0);
1701 assert_eq!(descriptor.offset(), offset);
1702 assert_eq!(
1703 descriptor.data_length(),
1704 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1705 );
1706 }
1707 }
1708
1709 #[test]
1710 fn buffer_read_at_write_at() {
1711 let pool = Pool::new_test_default();
1712 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1713 let mut buffer =
1714 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1715 assert_eq!(buffer.parts.len(), 2);
1718 assert_eq!(buffer.cap(), alloc_bytes);
1719 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1720 buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1721 let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1722 buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1723 for (idx, byte) in read_buf.iter().enumerate() {
1724 assert_eq!(*byte, write_buf[idx]);
1725 }
1726 }
1727
1728 #[test]
1729 fn buffer_read_write_seek() {
1730 let pool = Pool::new_test_default();
1731 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1732 let mut buffer =
1733 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1734 assert_eq!(buffer.parts.len(), 2);
1737 assert_eq!(buffer.cap(), alloc_bytes);
1738 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1739 assert_eq!(
1740 buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1741 write_buf.len()
1742 );
1743 const SEEK_FROM_END: usize = 64;
1744 const READ_LEN: usize = 12;
1745 assert_eq!(
1746 buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1747 u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1748 );
1749 let mut read_buf = [0xff; READ_LEN];
1750 assert_eq!(
1751 buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1752 read_buf.len()
1753 );
1754 assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1755 }
1756
1757 #[test_case(32; "single buffer part")]
1758 #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1759 fn buffer_pad(pad_size: usize) {
1760 let mut pool = Pool::new_test_default();
1761 pool.set_min_tx_buffer_length(pad_size);
1762 for offset in 0..pad_size {
1763 Arc::get_mut(&mut pool)
1764 .expect("there are multiple owners of the underlying VMO")
1765 .fill_sentinel_bytes();
1766 let mut buffer =
1767 pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1768 buffer.check_write_and_pad(offset, pad_size);
1769 }
1770 }
1771
1772 #[test]
1773 fn buffer_pad_grow() {
1774 const BUFFER_PARTS: u8 = 3;
1775 let mut pool = Pool::new_test_default();
1776 let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1777 * u32::from(BUFFER_PARTS)
1778 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1779 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1780 pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1781 for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1782 Arc::get_mut(&mut pool)
1783 .expect("there are multiple owners of the underlying VMO")
1784 .fill_sentinel_bytes();
1785 let mut alloc =
1786 pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1787 alloc
1788 .init(
1789 DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1790 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1791 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1792 )
1793 .expect("head/body/tail sizes are representable with u16/u32/u16");
1794 let mut buffer = Buffer::try_from(alloc).unwrap();
1795 buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1796 }
1797 }
1798
1799 #[test_case( 0; "writes at the beginning")]
1800 #[test_case( 15; "writes in the first part")]
1801 #[test_case( 75; "writes in the second part")]
1802 #[test_case(135; "writes in the third part")]
1803 #[test_case(195; "writes in the last part")]
1804 fn buffer_used(write_offset: usize) {
1805 let pool = Pool::new_test_default();
1806 let mut buffer =
1807 pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1808 let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1809 if i == 0 {
1810 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1811 } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1812 DEFAULT_BUFFER_LENGTH.get()
1813 } else {
1814 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1815 }
1816 });
1817 assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1818 buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1819 assert_eq!(
1822 buffer.parts.iter().zip(expected_caps).fold(
1823 Some(write_offset),
1824 |offset, (part, expected_cap)| {
1825 assert_eq!(part.cap, expected_cap);
1827
1828 match offset {
1829 Some(offset) => {
1830 if offset >= expected_cap {
1831 assert_eq!(part.len, part.cap);
1833 Some(offset - part.len)
1834 } else {
1835 assert_eq!(part.len, offset + 1);
1837 let mut buf = [0];
1838 assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1840 assert_eq!(buf[0], WRITE_BYTE);
1841 None
1842 }
1843 }
1844 None => {
1845 assert_eq!(part.len, 0);
1847 None
1848 }
1849 }
1850 }
1851 ),
1852 None
1853 )
1854 }
1855
1856 #[test]
1857 fn buffer_commit() {
1858 let pool = Pool::new_test_default();
1859 for offset in 0..MAX_BUFFER_BYTES {
1860 let mut buffer = pool
1861 .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1862 .expect("failed to allocate buffer");
1863 buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1864 buffer.commit();
1865 for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1866 let head_length = descriptor.head_length();
1867 let tail_length = descriptor.tail_length();
1868 let data_length = descriptor.data_length();
1869 assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1870 assert_eq!(
1871 u32::from(head_length + tail_length) + data_length,
1872 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1873 );
1874 }
1875 }
1876 }
1877
1878 #[test]
1879 fn allocate_under_device_minimum() {
1880 const MIN_TX_DATA: usize = 32;
1881 const ALLOC_SIZE: usize = 16;
1882 const WRITE_BYTE: u8 = 0xff;
1883 const WRITE_SENTINAL_BYTE: u8 = 0xee;
1884 const READ_SENTINAL_BYTE: u8 = 0xdd;
1885 let mut config = DEFAULT_CONFIG;
1886 config.buffer_layout.min_tx_data = MIN_TX_DATA;
1887 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1888 for mut buffer in Vec::from_iter(std::iter::from_fn({
1889 let pool = pool.clone();
1890 move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1891 })) {
1892 buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1893 }
1894 let mut allocated =
1895 pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1896 assert_eq!(allocated.cap(), ALLOC_SIZE);
1897 const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1898 assert_matches!(
1899 allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1900 Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1901 );
1902 allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1903 assert_matches!(allocated.pad(), Ok(()));
1904 assert_eq!(allocated.cap(), MIN_TX_DATA);
1905 assert_eq!(allocated.len(), MIN_TX_DATA);
1906 const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1907 let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1908 assert_matches!(
1909 allocated.read_at(0, &mut read_buf[..]),
1910 Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1911 );
1912 allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1913 assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1914 assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1915 assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1916 }
1917
1918 #[test]
1919 fn invalid_tx_length() {
1920 let mut config = DEFAULT_CONFIG;
1921 config.buffer_layout.length = usize::from(u16::MAX) + 2;
1922 config.buffer_layout.min_tx_head = 0;
1923 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1924 assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1925 }
1926
1927 #[test]
1928 fn rx_leases() {
1929 let mut executor = fuchsia_async::TestExecutor::new();
1930 let state = RxLeaseHandlingState::new_with_enabled(true);
1931 let mut watcher = RxLeaseWatcher { state: &state };
1932
1933 {
1934 let mut fut = pin!(watcher.wait_until(0));
1935 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1936 }
1937 {
1938 state.rx_complete();
1939 let mut fut = pin!(watcher.wait_until(1));
1940 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1941 }
1942 {
1943 let mut fut = pin!(watcher.wait_until(0));
1944 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1945 }
1946 {
1947 let mut fut = pin!(watcher.wait_until(3));
1948 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1949 state.rx_complete();
1950 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1951 state.rx_complete();
1952 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1953 }
1954 let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1957 {
1958 let mut fut = pin!(watcher.wait_until(10000));
1959 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1960 }
1961 let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1962 assert_eq!(counter_before, counter_after);
1963 }
1964}