1use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32pub(in crate::session) struct Pool {
34 base: NonNull<u8>,
37 bytes: usize,
39 descriptors: Descriptors,
41 tx_alloc_state: Mutex<TxAllocState>,
43 pub(in crate::session) rx_pending: Pending<Rx>,
45 buffer_layout: BufferLayout,
47 rx_leases: RxLeaseHandlingState,
49}
50
51unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58struct TxAllocState {
60 requests: VecDeque<TxAllocReq>,
62 free_list: TxFreeList,
63}
64
65struct TxFreeList {
73 head: Option<DescId<Tx>>,
76 len: u16,
78}
79
80impl Pool {
81 pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86 let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87 config;
88 let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89 let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90 Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92 let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94 descriptors.borrow_mut(&mut curr).set_nxt(head);
95 Some(curr)
96 });
97
98 for rx_desc in rx_free.iter_mut() {
99 descriptors.borrow_mut(rx_desc).initialize(
100 ChainLength::ZERO,
101 0,
102 buffer_layout.length.try_into().unwrap(),
103 0,
104 );
105 }
106
107 let tx_alloc_state = TxAllocState {
108 free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109 requests: VecDeque::new(),
110 };
111
112 let size = buffer_stride.get() * u64::from(num_buffers);
113 let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115 const VMO_NAME: zx::Name =
116 const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117 data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118 let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123 let base = NonNull::new(
126 vmar_root_self()
127 .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128 .map_err(|status| Error::Map("data", status))? as *mut u8,
129 )
130 .unwrap();
131
132 Ok((
133 Arc::new(Pool {
134 base,
135 bytes: len,
136 descriptors,
137 tx_alloc_state: Mutex::new(tx_alloc_state),
138 rx_pending: Pending::new(rx_free),
139 buffer_layout,
140 rx_leases: RxLeaseHandlingState::new_with_flags(options),
141 }),
142 descriptors_vmo,
143 data_vmo,
144 ))
145 }
146
147 pub(in crate::session) async fn alloc_tx(
156 self: &Arc<Self>,
157 num_parts: ChainLength,
158 ) -> AllocGuard<Tx> {
159 let receiver = {
160 let mut state = self.tx_alloc_state.lock();
161 match state.free_list.try_alloc(num_parts, &self.descriptors) {
162 Some(allocated) => {
163 return AllocGuard::new(allocated, self.clone());
164 }
165 None => {
166 let (request, receiver) = TxAllocReq::new(num_parts);
167 state.requests.push_back(request);
168 receiver
169 }
170 }
171 };
172 receiver.await.unwrap()
174 }
175
176 pub(in crate::session) async fn alloc_tx_buffer(
183 self: &Arc<Self>,
184 num_bytes: usize,
185 ) -> Result<Buffer<Tx>> {
186 self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
187 }
188
189 pub(in crate::session) async fn alloc_tx_buffers<'a>(
202 self: &'a Arc<Self>,
203 num_bytes: usize,
204 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
205 let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
206 self.buffer_layout;
207 let tx_head = usize::from(min_tx_head);
208 let tx_tail = usize::from(min_tx_tail);
209 let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
210 let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
211 let chain_length = ChainLength::try_from(num_parts)?;
212 let first = self.alloc_tx(chain_length).await;
213 let iter = std::iter::once(first)
214 .chain(std::iter::from_fn(move || {
215 let mut state = self.tx_alloc_state.lock();
216 state
217 .free_list
218 .try_alloc(chain_length, &self.descriptors)
219 .map(|allocated| AllocGuard::new(allocated, self.clone()))
220 }))
221 .fuse()
224 .map(move |mut alloc| {
225 alloc.init(num_bytes)?;
226 Ok(alloc.into())
227 });
228 Ok(iter)
229 }
230
231 pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
233 self.rx_pending.extend(descs.into_iter().map(|mut desc| {
234 self.descriptors.borrow_mut(&mut desc).initialize(
235 ChainLength::ZERO,
236 0,
237 self.buffer_layout.length.try_into().unwrap(),
238 0,
239 );
240 desc
241 }));
242 }
243
244 fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
250 let mut to_fulfill = ArrayVec::<
254 (TxAllocReq, AllocGuard<Tx>),
255 { netdev::MAX_DESCRIPTOR_CHAIN as usize },
256 >::new();
257
258 let mut state = self.tx_alloc_state.lock();
259
260 {
261 let mut descs = chain.into_iter();
262 state.free_list.len += u16::try_from(descs.len()).unwrap();
266 let head = descs.next();
267 let old_head = std::mem::replace(&mut state.free_list.head, head);
268 let mut tail = descs.last();
269 let mut tail_ref = self.descriptors.borrow_mut(
270 tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
271 );
272 tail_ref.set_nxt(old_head);
273 }
274
275 while let Some(req) = state.requests.front() {
278 if req.sender.is_canceled() {
283 let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
284 continue;
285 }
286 let size = req.size;
287 match state.free_list.try_alloc(size, &self.descriptors) {
288 Some(descs) => {
289 let req = state.requests.pop_front().unwrap();
291 to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
292
293 if to_fulfill.is_full() {
297 drop(state);
298 for (req, alloc) in to_fulfill.drain(..) {
299 req.fulfill(alloc)
300 }
301 state = self.tx_alloc_state.lock();
302 }
303 }
304 None => break,
305 }
306 }
307
308 drop(state);
310 for (req, alloc) in to_fulfill {
312 req.fulfill(alloc)
313 }
314 }
315
316 pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
320 let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
321 Ok(self.free_tx(chain))
322 }
323
324 pub(in crate::session) fn rx_completed(
330 self: &Arc<Self>,
331 head: DescId<Rx>,
332 ) -> Result<Buffer<Rx>> {
333 let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
334 let alloc = AllocGuard::new(descs, self.clone());
335 Ok(alloc.into())
336 }
337}
338
339impl Drop for Pool {
340 fn drop(&mut self) {
341 unsafe {
342 vmar_root_self()
343 .unmap(self.base.as_ptr() as usize, self.bytes)
344 .expect("failed to unmap VMO for Pool")
345 }
346 }
347}
348
349impl TxFreeList {
350 fn try_alloc(
354 &mut self,
355 num_parts: ChainLength,
356 descriptors: &Descriptors,
357 ) -> Option<Chained<DescId<Tx>>> {
358 if u16::from(num_parts.get()) > self.len {
359 return None;
360 }
361
362 let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
363 let new_head = self.head.as_ref().and_then(|head| {
364 let nxt = descriptors.borrow(head).nxt();
365 nxt.map(|id| unsafe {
366 DescId::from_raw(id)
369 })
370 });
371 std::mem::replace(&mut self.head, new_head)
372 });
373 let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
374 assert_eq!(allocated.len(), num_parts.into());
375 self.len -= u16::from(num_parts.get());
376 Some(allocated)
377 }
378}
379
380pub struct Buffer<K: AllocKind> {
385 alloc: AllocGuard<K>,
387 parts: Chained<BufferPart>,
389 pos: usize,
391}
392
393impl<K: AllocKind> Debug for Buffer<K> {
394 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395 let Self { alloc, parts, pos } = self;
396 f.debug_struct("Buffer")
397 .field("cap", &self.cap())
398 .field("alloc", alloc)
399 .field("parts", parts)
400 .field("pos", pos)
401 .finish()
402 }
403}
404
405impl<K: AllocKind> Buffer<K> {
406 pub fn cap(&self) -> usize {
408 self.parts.iter().fold(0, |acc, part| acc + part.cap)
409 }
410
411 pub fn len(&self) -> usize {
413 self.parts.iter().fold(0, |acc, part| acc + part.len)
414 }
415
416 pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
422 if self.cap() < offset + src.len() {
423 return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
424 }
425 let mut part_start = 0;
426 let mut total = 0;
427 for part in self.parts.iter_mut() {
428 if offset + total < part_start + part.cap {
429 let written = part.write_at(offset + total - part_start, &src[total..])?;
430 total += written;
431 if total == src.len() {
432 break;
433 }
434 } else {
435 part.len = part.cap;
436 }
437 part_start += part.cap;
438 }
439 assert_eq!(total, src.len());
440 Ok(())
441 }
442
443 pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
449 if self.len() < offset + dst.len() {
450 return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
451 }
452 let mut part_start = 0;
453 let mut total = 0;
454 for part in self.parts.iter() {
455 if offset + total < part_start + part.cap {
456 let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
457 total += read;
458 if total == dst.len() {
459 break;
460 }
461 }
462 part_start += part.cap;
463 }
464 assert_eq!(total, dst.len());
465 Ok(())
466 }
467
468 pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
470 match &mut (*self.parts)[..] {
471 [] => Some(&mut []),
472 [one] => Some(one.as_slice_mut()),
473 _ => None,
474 }
475 }
476
477 pub(in crate::session) fn pad(&mut self) -> Result<()> {
479 let num_parts = self.parts.len();
480 let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
481 self.alloc.pool.buffer_layout;
482 let mut target = min_tx_data;
483 for (i, part) in self.parts.iter_mut().enumerate() {
484 let grow_cap = if i == num_parts - 1 {
485 let descriptor =
486 self.alloc.descriptors().last().expect("descriptor must not be empty");
487 let data_length = descriptor.data_length();
488 let tail_length = descriptor.tail_length();
489 let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
491 match rest.checked_sub(usize::from(min_tx_tail)) {
492 Some(grow_cap) => Some(grow_cap),
493 None => break,
494 }
495 } else {
496 None
497 };
498 target -= part.pad(target, grow_cap)?;
499 }
500 if target != 0 {
501 return Err(Error::Pad(min_tx_data, self.cap()));
502 }
503 Ok(())
504 }
505
506 pub(in crate::session) fn leak(mut self) -> DescId<K> {
510 let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
511 descs.into_iter().next().unwrap()
512 }
513
514 pub fn frame_type(&self) -> Result<netdev::FrameType> {
516 self.alloc.descriptor().frame_type()
517 }
518
519 pub fn port(&self) -> Port {
521 self.alloc.descriptor().port()
522 }
523}
524
525impl Buffer<Tx> {
526 pub(in crate::session) fn commit(&mut self) {
528 for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
529 descriptor.commit(u32::try_from(part.len).unwrap())
532 }
533 }
534
535 pub fn set_port(&mut self, port: Port) {
537 self.alloc.descriptor_mut().set_port(port)
538 }
539
540 pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
542 self.alloc.descriptor_mut().set_frame_type(frame_type)
543 }
544
545 pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
547 self.alloc.descriptor_mut().set_tx_flags(flags)
548 }
549}
550
551impl Buffer<Rx> {
552 pub async fn into_tx(self) -> Buffer<Tx> {
554 let Buffer { alloc, parts, pos } = self;
555 Buffer { alloc: alloc.into_tx().await, parts, pos }
556 }
557
558 pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
560 self.alloc.descriptor().rx_flags()
561 }
562}
563
564impl AllocGuard<Rx> {
565 async fn into_tx(mut self) -> AllocGuard<Tx> {
571 let mut tx = self.pool.alloc_tx(self.descs.len).await;
572 std::mem::swap(&mut self.descs.storage, unsafe {
578 std::mem::transmute(&mut tx.descs.storage)
579 });
580 tx
581 }
582}
583
584struct Chained<T> {
586 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
587 len: ChainLength,
588}
589
590impl<T> Deref for Chained<T> {
591 type Target = [T];
592
593 fn deref(&self) -> &Self::Target {
594 unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
596 }
597}
598
599impl<T> DerefMut for Chained<T> {
600 fn deref_mut(&mut self) -> &mut Self::Target {
601 unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
603 }
604}
605
606impl<T> Drop for Chained<T> {
607 fn drop(&mut self) {
608 unsafe {
610 std::ptr::drop_in_place(self.deref_mut());
611 }
612 }
613}
614
615impl<T: Debug> Debug for Chained<T> {
616 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617 f.debug_list().entries(self.iter()).finish()
618 }
619}
620
621impl<T> Chained<T> {
622 #[allow(clippy::uninit_assumed_init)]
623 fn empty() -> Self {
624 Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
631 }
632}
633
634impl<T> FromIterator<T> for Chained<T> {
635 fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
640 let mut result = Self::empty();
641 let mut len = 0u8;
642 for (idx, e) in elements.into_iter().enumerate() {
643 result.storage[idx] = MaybeUninit::new(e);
644 len += 1;
645 }
646 assert!(len > 0);
647 result.len = ChainLength::try_from(len).unwrap();
650 result
651 }
652}
653
654impl<T> IntoIterator for Chained<T> {
655 type Item = T;
656 type IntoIter = ChainedIter<T>;
657
658 fn into_iter(mut self) -> Self::IntoIter {
659 let len = self.len;
660 self.len = ChainLength::ZERO;
661 #[allow(clippy::uninit_assumed_init)]
668 let storage =
669 std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
670 ChainedIter { storage, len, consumed: 0 }
671 }
672}
673
674struct ChainedIter<T> {
675 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
676 len: ChainLength,
677 consumed: u8,
678}
679
680impl<T> Iterator for ChainedIter<T> {
681 type Item = T;
682
683 fn next(&mut self) -> Option<Self::Item> {
684 if self.consumed < self.len.get() {
685 let value = unsafe {
688 std::mem::replace(
689 &mut self.storage[usize::from(self.consumed)],
690 MaybeUninit::uninit(),
691 )
692 .assume_init()
693 };
694 self.consumed += 1;
695 Some(value)
696 } else {
697 None
698 }
699 }
700
701 fn size_hint(&self) -> (usize, Option<usize>) {
702 let len = usize::from(self.len.get() - self.consumed);
703 (len, Some(len))
704 }
705}
706
707impl<T> ExactSizeIterator for ChainedIter<T> {}
708
709impl<T> Drop for ChainedIter<T> {
710 fn drop(&mut self) {
711 unsafe {
713 std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
714 &mut self.storage[self.consumed.into()..self.len.into()],
715 ));
716 }
717 }
718}
719
720pub(in crate::session) struct AllocGuard<K: AllocKind> {
722 descs: Chained<DescId<K>>,
723 pool: Arc<Pool>,
724}
725
726impl<K: AllocKind> Debug for AllocGuard<K> {
727 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728 let Self { descs, pool: _ } = self;
729 f.debug_struct("AllocGuard").field("descs", descs).finish()
730 }
731}
732
733impl<K: AllocKind> AllocGuard<K> {
734 fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
735 Self { descs, pool }
736 }
737
738 fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
740 self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
741 }
742
743 fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
745 let descriptors = &self.pool.descriptors;
746 self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
747 }
748
749 fn descriptor(&self) -> DescRef<'_, K> {
751 self.descriptors().next().expect("descriptors must not be empty")
752 }
753
754 fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
756 self.descriptors_mut().next().expect("descriptors must not be empty")
757 }
758}
759
760impl AllocGuard<Tx> {
761 fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
763 let len = self.len();
764 let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
765 self.pool.buffer_layout;
766 for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
767 let chain_length = ChainLength::try_from(clen).unwrap();
768 let head_length = if clen + 1 == len { min_tx_head } else { 0 };
769 let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
770
771 let available_bytes =
774 u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
775 .unwrap();
776
777 let data_length = match u32::try_from(requested_bytes) {
778 Ok(requested) => {
779 if requested < available_bytes {
780 tail_length = u16::try_from(available_bytes - requested)
784 .ok_checked::<TryFromIntError>()
785 .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
786 .ok_or(Error::TxLength)?;
787 }
788 requested.min(available_bytes)
789 }
790 Err(TryFromIntError { .. }) => available_bytes,
791 };
792
793 requested_bytes -=
794 usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
795 panic!(
796 "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
797 data_length, requested_bytes
798 )
799 });
800 descriptor.initialize(chain_length, head_length, data_length, tail_length);
801 }
802 assert_eq!(requested_bytes, 0);
803 Ok(())
804 }
805}
806
807impl<K: AllocKind> Drop for AllocGuard<K> {
808 fn drop(&mut self) {
809 if self.is_empty() {
810 return;
811 }
812 K::free(private::Allocation(self));
813 }
814}
815
816impl<K: AllocKind> Deref for AllocGuard<K> {
817 type Target = [DescId<K>];
818
819 fn deref(&self) -> &Self::Target {
820 self.descs.deref()
821 }
822}
823
824struct BufferPart {
828 ptr: *mut u8,
830 cap: usize,
832 len: usize,
836}
837
838impl BufferPart {
839 unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
848 Self { ptr, cap, len }
849 }
850
851 fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
857 let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
858 let to_copy = std::cmp::min(available, dst.len());
859 unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
862 Ok(to_copy)
863 }
864
865 fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
871 let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
872 let to_copy = std::cmp::min(src.len(), available);
873 unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
876 self.len = std::cmp::max(self.len, offset + to_copy);
877 Ok(to_copy)
878 }
879
880 fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
887 if target <= self.len {
888 return Ok(target);
889 }
890 if let Some(limit) = limit {
891 if target > limit {
892 return Err(Error::Pad(target, self.cap));
893 }
894 if self.cap < target {
895 self.cap = target
896 }
897 }
898 let new_len = std::cmp::min(target, self.cap);
899 unsafe {
902 std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
903 }
904 self.len = new_len;
905 Ok(new_len)
906 }
907
908 fn as_slice_mut(&mut self) -> &mut [u8] {
910 unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
914 }
915}
916
917unsafe impl Send for BufferPart {}
921
922impl Debug for BufferPart {
923 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
924 let BufferPart { len, cap, ptr } = &self;
925 f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
926 }
927}
928
929impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
930 fn from(alloc: AllocGuard<K>) -> Self {
931 let AllocGuard { pool, descs: _ } = &alloc;
932 let parts: Chained<BufferPart> = alloc
933 .descriptors()
934 .map(|descriptor| {
935 let offset = usize::try_from(descriptor.offset()).unwrap();
938 let head_length = usize::from(descriptor.head_length());
939 let data_length = usize::try_from(descriptor.data_length()).unwrap();
940 let len = match K::REFL {
941 AllocKindRefl::Tx => 0,
942 AllocKindRefl::Rx => data_length,
943 };
944 assert!(
946 offset + head_length <= pool.bytes,
947 "buffer part starts beyond the end of pool"
948 );
949 assert!(
950 offset + head_length + data_length <= pool.bytes,
951 "buffer part ends beyond the end of pool"
952 );
953 unsafe {
959 BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
960 }
961 })
962 .collect();
963 Self { alloc, parts, pos: 0 }
964 }
965}
966
967impl<K: AllocKind> Read for Buffer<K> {
968 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
969 self.read_at(self.pos, buf)
970 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
971 self.pos += buf.len();
972 Ok(buf.len())
973 }
974}
975
976impl Write for Buffer<Tx> {
977 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
978 self.write_at(self.pos, buf)
979 .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
980 self.pos += buf.len();
981 Ok(buf.len())
982 }
983
984 fn flush(&mut self) -> std::io::Result<()> {
985 Ok(())
986 }
987}
988
989impl<K: AllocKind> Seek for Buffer<K> {
990 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
991 let pos = match pos {
992 SeekFrom::Start(pos) => pos,
993 SeekFrom::End(offset) => {
994 let end = i64::try_from(self.cap())
995 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
996 u64::try_from(end.wrapping_add(offset)).unwrap()
997 }
998 SeekFrom::Current(offset) => {
999 let current = i64::try_from(self.pos)
1000 .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1001 u64::try_from(current.wrapping_add(offset)).unwrap()
1002 }
1003 };
1004 self.pos =
1005 usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1006 Ok(pos)
1007 }
1008}
1009
1010struct TxAllocReq {
1012 sender: Sender<AllocGuard<Tx>>,
1013 size: ChainLength,
1014}
1015
1016impl TxAllocReq {
1017 fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1018 let (sender, receiver) = channel();
1019 (TxAllocReq { sender, size }, receiver)
1020 }
1021
1022 fn fulfill(self, guard: AllocGuard<Tx>) {
1030 let Self { sender, size: _ } = self;
1031 match sender.send(guard) {
1032 Ok(()) => (),
1033 Err(guard) => {
1034 drop(guard);
1037 }
1038 }
1039 }
1040}
1041
1042mod private {
1045 use super::{AllocKind, Rx, Tx};
1046 pub trait Sealed: 'static + Sized {}
1047 impl Sealed for Rx {}
1048 impl Sealed for Tx {}
1049
1050 pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1054}
1055
1056pub trait AllocKind: private::Sealed {
1059 const REFL: AllocKindRefl;
1061
1062 fn free(alloc: private::Allocation<'_, Self>);
1064}
1065
1066pub enum Tx {}
1068pub enum Rx {}
1070
1071pub enum AllocKindRefl {
1073 Tx,
1074 Rx,
1075}
1076
1077impl AllocKindRefl {
1078 pub(in crate::session) fn as_str(&self) -> &'static str {
1079 match self {
1080 AllocKindRefl::Tx => "Tx",
1081 AllocKindRefl::Rx => "Rx",
1082 }
1083 }
1084}
1085
1086impl AllocKind for Tx {
1087 const REFL: AllocKindRefl = AllocKindRefl::Tx;
1088
1089 fn free(alloc: private::Allocation<'_, Self>) {
1090 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1091 pool.free_tx(std::mem::replace(descs, Chained::empty()));
1092 }
1093}
1094
1095impl AllocKind for Rx {
1096 const REFL: AllocKindRefl = AllocKindRefl::Rx;
1097
1098 fn free(alloc: private::Allocation<'_, Self>) {
1099 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1100 pool.free_rx(std::mem::replace(descs, Chained::empty()));
1101 pool.rx_leases.rx_complete();
1102 }
1103}
1104
1105pub(in crate::session) struct RxLeaseHandlingState {
1107 can_watch_rx_leases: AtomicBool,
1108 rx_frame_counter: AtomicU64,
1118 rx_lease_waker: AtomicWaker,
1119}
1120
1121impl RxLeaseHandlingState {
1122 fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1123 Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1124 }
1125
1126 fn new_with_enabled(enabled: bool) -> Self {
1127 Self {
1128 can_watch_rx_leases: AtomicBool::new(enabled),
1129 rx_frame_counter: AtomicU64::new(0),
1130 rx_lease_waker: AtomicWaker::new(),
1131 }
1132 }
1133
1134 fn rx_complete(&self) {
1137 let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1138 let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1139
1140 if prev == u64::MAX {
1143 rx_lease_waker.wake();
1144 }
1145 }
1146}
1147
1148pub(in crate::session) trait RxLeaseHandlingStateContainer {
1151 fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1152}
1153
1154impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1155 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1156 self.borrow()
1157 }
1158}
1159
1160impl RxLeaseHandlingStateContainer for Arc<Pool> {
1161 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1162 &self.rx_leases
1163 }
1164}
1165
1166pub(in crate::session) struct RxLeaseWatcher<T> {
1168 state: T,
1169}
1170
1171impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1172 pub(in crate::session) fn new(state: T) -> Self {
1179 assert!(
1180 state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1181 "can't watch rx leases"
1182 );
1183 Self { state }
1184 }
1185
1186 pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1195 let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1204 self.state.lease_handling_state();
1205
1206 let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1207 let _guard = scopeguard::guard((), |()| {
1210 let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1211 });
1212
1213 if prev >= hold_until_frame {
1215 return;
1216 }
1217 let threshold = prev.wrapping_sub(hold_until_frame);
1220 futures::future::poll_fn(|cx| {
1221 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1222 if v < threshold {
1223 return Poll::Ready(());
1224 }
1225 rx_lease_waker.register(cx.waker());
1226 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1227 if v < threshold {
1228 return Poll::Ready(());
1229 }
1230 Poll::Pending
1231 })
1232 .await;
1233 }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239
1240 use assert_matches::assert_matches;
1241 use fuchsia_async as fasync;
1242 use futures::future::FutureExt;
1243 use test_case::test_case;
1244
1245 use std::collections::HashSet;
1246 use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1247 use std::pin::pin;
1248 use std::task::{Poll, Waker};
1249
1250 const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1251 const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1252 const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1254 const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1255 const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1256 const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1257 * netdev::MAX_DESCRIPTOR_CHAIN as usize
1258 - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1259 - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1260
1261 const SENTINEL_BYTE: u8 = 0xab;
1262 const WRITE_BYTE: u8 = 1;
1263 const PAD_BYTE: u8 = 0;
1264
1265 const DEFAULT_CONFIG: Config = Config {
1266 buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1267 num_rx_buffers: DEFAULT_RX_BUFFERS,
1268 num_tx_buffers: DEFAULT_TX_BUFFERS,
1269 options: netdev::SessionFlags::empty(),
1270 buffer_layout: BufferLayout {
1271 length: DEFAULT_BUFFER_LENGTH.get(),
1272 min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1273 min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1274 min_tx_data: 0,
1275 },
1276 };
1277
1278 impl Pool {
1279 fn new_test_default() -> Arc<Self> {
1280 let (pool, _descriptors, _data) =
1281 Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1282 pool
1283 }
1284
1285 async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1286 self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1287 .await
1288 }
1289
1290 fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1291 self.alloc_tx_checked(n).now_or_never()
1292 }
1293
1294 fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1295 std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1296 }
1297
1298 fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1299 self.alloc_tx_buffer(num_bytes)
1300 .now_or_never()
1301 .transpose()
1302 .expect("invalid arguments for alloc_tx_buffer")
1303 }
1304
1305 fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1306 Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1307 }
1308
1309 fn fill_sentinel_bytes(&mut self) {
1310 unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1313 }
1314 }
1315
1316 impl Buffer<Tx> {
1317 fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1321 self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1322 self.pad().expect("failed to pad");
1323 assert_eq!(self.len(), pad_size);
1324 const INIT_BYTE: u8 = 42;
1327 let mut read_buf = vec![INIT_BYTE; pad_size];
1328 self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1329 for (idx, byte) in read_buf.iter().enumerate() {
1330 if idx < offset {
1331 assert_eq!(*byte, SENTINEL_BYTE);
1332 } else if idx == offset {
1333 assert_eq!(*byte, WRITE_BYTE);
1334 } else {
1335 assert_eq!(*byte, PAD_BYTE);
1336 }
1337 }
1338 }
1339 }
1340
1341 impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1342 where
1343 K: AllocKind,
1344 I: ExactSizeIterator<Item = u16>,
1345 T: Copy + IntoIterator<IntoIter = I>,
1346 {
1347 fn eq(&self, other: &T) -> bool {
1348 let iter = other.into_iter();
1349 if usize::from(self.len) != iter.len() {
1350 return false;
1351 }
1352 self.iter().zip(iter).all(|(l, r)| l.get() == r)
1353 }
1354 }
1355
1356 impl Debug for TxAllocReq {
1357 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1358 let TxAllocReq { sender: _, size } = self;
1359 f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1360 }
1361 }
1362
1363 #[test]
1364 fn alloc_tx_distinct() {
1365 let pool = Pool::new_test_default();
1366 let allocated = pool.alloc_tx_all(1);
1367 assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1368 let distinct = allocated
1369 .iter()
1370 .map(|alloc| {
1371 assert_eq!(alloc.descs.len(), 1);
1372 alloc.descs[0].get()
1373 })
1374 .collect::<HashSet<u16>>();
1375 assert_eq!(allocated.len(), distinct.len());
1376 }
1377
1378 #[test]
1379 fn alloc_tx_free_len() {
1380 let pool = Pool::new_test_default();
1381 {
1382 let allocated = pool.alloc_tx_all(2);
1383 assert_eq!(
1384 allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1385 DEFAULT_TX_BUFFERS.get().into()
1386 );
1387 assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1388 }
1389 assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1390 }
1391
1392 #[test]
1393 fn alloc_tx_chain() {
1394 let pool = Pool::new_test_default();
1395 let allocated = pool.alloc_tx_all(3);
1396 assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1397 assert_matches!(pool.alloc_tx_now_or_never(3), None);
1398 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1399 }
1400
1401 #[test]
1402 fn alloc_tx_many() {
1403 let pool = Pool::new_test_default();
1404 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1405 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1406 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1407 let data_len = usize::try_from(data_len).unwrap();
1408 let mut buffers = pool
1409 .alloc_tx_buffers(data_len)
1410 .now_or_never()
1411 .expect("failed to alloc")
1412 .unwrap()
1413 .collect::<Result<Vec<_>>>()
1416 .expect("buffer error");
1417 assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1418
1419 assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1422
1423 assert_matches!(buffers.pop(), Some(_));
1425 let mut more_buffers =
1426 pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1427 let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1428 assert_matches!(more_buffers.next(), None);
1429 drop(buffer);
1432 assert_matches!(more_buffers.next(), None);
1433 }
1434
1435 #[test]
1436 fn alloc_tx_after_free() {
1437 let pool = Pool::new_test_default();
1438 let mut allocated = pool.alloc_tx_all(1);
1439 assert_matches!(pool.alloc_tx_now_or_never(2), None);
1440 {
1441 let _drained = allocated.drain(..2);
1442 }
1443 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1444 }
1445
1446 #[test]
1447 fn blocking_alloc_tx() {
1448 let mut executor = fasync::TestExecutor::new();
1449 let pool = Pool::new_test_default();
1450 let mut allocated = pool.alloc_tx_all(1);
1451 let alloc_fut = pool.alloc_tx_checked(1);
1452 let mut alloc_fut = pin!(alloc_fut);
1453 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1455 assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1457 let freed = allocated
1458 .pop()
1459 .expect("no fulfulled allocations")
1460 .iter()
1461 .map(|x| x.get())
1462 .collect::<Chained<_>>();
1463 let same_as_freed =
1464 |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1465 assert_matches!(
1467 &executor.run_until_stalled(&mut alloc_fut),
1468 Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1469 );
1470 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1472 }
1473
1474 #[test]
1475 fn blocking_alloc_tx_cancel_before_free() {
1476 let mut executor = fasync::TestExecutor::new();
1477 let pool = Pool::new_test_default();
1478 let mut allocated = pool.alloc_tx_all(1);
1479 {
1480 let alloc_fut = pool.alloc_tx_checked(1);
1481 let mut alloc_fut = pin!(alloc_fut);
1482 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1483 assert_matches!(
1484 pool.tx_alloc_state.lock().requests.as_slices(),
1485 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1486 );
1487 }
1488 assert_matches!(
1489 allocated.pop(),
1490 Some(AllocGuard { ref descs, pool: ref p })
1491 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1492 );
1493 let state = pool.tx_alloc_state.lock();
1494 assert_eq!(state.free_list.len, 1);
1495 assert!(state.requests.is_empty());
1496 }
1497
1498 #[test]
1499 fn blocking_alloc_tx_cancel_after_free() {
1500 let mut executor = fasync::TestExecutor::new();
1501 let pool = Pool::new_test_default();
1502 let mut allocated = pool.alloc_tx_all(1);
1503 {
1504 let alloc_fut = pool.alloc_tx_checked(1);
1505 let mut alloc_fut = pin!(alloc_fut);
1506 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1507 assert_matches!(
1508 pool.tx_alloc_state.lock().requests.as_slices(),
1509 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1510 );
1511 assert_matches!(
1512 allocated.pop(),
1513 Some(AllocGuard { ref descs, pool: ref p })
1514 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1515 );
1516 }
1517 let state = pool.tx_alloc_state.lock();
1518 assert_eq!(state.free_list.len, 1);
1519 assert!(state.requests.is_empty());
1520 }
1521
1522 #[test]
1523 fn multiple_blocking_alloc_tx_fulfill_order() {
1524 const TASKS_TOTAL: usize = 3;
1525 let mut executor = fasync::TestExecutor::new();
1526 let pool = Pool::new_test_default();
1527 let mut allocated = pool.alloc_tx_all(1);
1528 let mut alloc_futs = (1..=TASKS_TOTAL)
1529 .rev()
1530 .map(|x| {
1531 let pool = pool.clone();
1532 (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1533 })
1534 .collect::<Vec<_>>();
1535
1536 for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1537 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1538 assert_eq!(idx + *req_size, TASKS_TOTAL);
1540 }
1541 {
1542 let state = pool.tx_alloc_state.lock();
1543 assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1545 let mut requests = state.requests.iter();
1546 assert!(requests.next().unwrap().sender.is_canceled());
1549 assert!(requests.all(|req| !req.sender.is_canceled()))
1551 }
1552
1553 let mut to_free = Vec::new();
1554 let mut freed = 0;
1555 for free_size in (1..=TASKS_TOTAL).rev() {
1556 let (_req_size, mut task) = alloc_futs.remove(0);
1557 for _ in 1..free_size {
1558 freed += 1;
1559 assert_matches!(
1560 allocated.pop(),
1561 Some(AllocGuard { ref descs, pool: ref p })
1562 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1563 );
1564 assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1565 }
1566 freed += 1;
1567 assert_matches!(
1568 allocated.pop(),
1569 Some(AllocGuard { ref descs, pool: ref p })
1570 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1571 );
1572 match executor.run_until_stalled(&mut task) {
1573 Poll::Ready(alloc) => {
1574 assert_eq!(alloc.len(), free_size);
1575 to_free.push(alloc);
1577 }
1578 Poll::Pending => panic!("The request should be fulfilled"),
1579 }
1580 for (_req_size, task) in alloc_futs.iter_mut() {
1582 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1583 }
1584 }
1585 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1586 }
1587
1588 #[test]
1589 fn singleton_tx_layout() {
1590 let pool = Pool::new_test_default();
1591 let buffers = std::iter::from_fn(|| {
1592 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1593 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1594 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1595 pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1596 assert_eq!(buffer.alloc.descriptors().count(), 1);
1597 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1598 * u64::from(buffer.alloc[0].get());
1599 {
1600 let descriptor = buffer.alloc.descriptor();
1601 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1602 assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1603 assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1604 assert_eq!(descriptor.data_length(), data_len);
1605 assert_eq!(descriptor.offset(), offset);
1606 }
1607
1608 assert_eq!(buffer.parts.len(), 1);
1609 let BufferPart { ptr, len, cap } = buffer.parts[0];
1610 assert_eq!(len, 0);
1611 assert_eq!(
1612 pool.base.as_ptr().wrapping_add(
1615 usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1616 ),
1617 ptr
1618 );
1619 assert_eq!(data_len, u32::try_from(cap).unwrap());
1620 buffer
1621 })
1622 })
1623 .collect::<Vec<_>>();
1624 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1625 }
1626
1627 #[test]
1628 fn chained_tx_layout() {
1629 let pool = Pool::new_test_default();
1630 let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1631 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1632 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1633 let buffers = std::iter::from_fn(|| {
1634 pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1635 assert_eq!(buffer.parts.len(), 4);
1636 for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1637 let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1638 let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1639 let tail_length = if chain_length == ChainLength::ZERO {
1640 DEFAULT_MIN_TX_BUFFER_TAIL
1641 } else {
1642 0
1643 };
1644 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1645 - u32::from(head_length)
1646 - u32::from(tail_length);
1647 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1648 * u64::from(buffer.alloc[idx].get());
1649 assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1650 assert_eq!(descriptor.head_length(), head_length);
1651 assert_eq!(descriptor.tail_length(), tail_length);
1652 assert_eq!(descriptor.offset(), offset);
1653 assert_eq!(descriptor.data_length(), data_len);
1654 if chain_length != ChainLength::ZERO {
1655 assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1656 }
1657
1658 let BufferPart { ptr, cap, len } = buffer.parts[idx];
1659 assert_eq!(len, 0);
1660 assert_eq!(
1661 pool.base.as_ptr().wrapping_add(
1664 usize::try_from(offset).unwrap() + usize::from(head_length),
1665 ),
1666 ptr
1667 );
1668 assert_eq!(data_len, u32::try_from(cap).unwrap());
1669 }
1670 buffer
1671 })
1672 })
1673 .collect::<Vec<_>>();
1674 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1675 }
1676
1677 #[test]
1678 fn rx_distinct() {
1679 let pool = Pool::new_test_default();
1680 let mut guard = pool.rx_pending.inner.lock();
1681 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1682 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1683 let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1684 assert_eq!(descs.len(), distinct.len());
1685 }
1686
1687 #[test]
1688 fn alloc_rx_layout() {
1689 let pool = Pool::new_test_default();
1690 let mut guard = pool.rx_pending.inner.lock();
1691 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1692 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1693 for desc in descs.iter() {
1694 let descriptor = pool.descriptors.borrow(desc);
1695 let offset =
1696 u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1697 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1698 assert_eq!(descriptor.head_length(), 0);
1699 assert_eq!(descriptor.tail_length(), 0);
1700 assert_eq!(descriptor.offset(), offset);
1701 assert_eq!(
1702 descriptor.data_length(),
1703 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1704 );
1705 }
1706 }
1707
1708 #[test]
1709 fn buffer_read_at_write_at() {
1710 let pool = Pool::new_test_default();
1711 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1712 let mut buffer =
1713 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1714 assert_eq!(buffer.parts.len(), 2);
1717 assert_eq!(buffer.cap(), alloc_bytes);
1718 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1719 buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1720 let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1721 buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1722 for (idx, byte) in read_buf.iter().enumerate() {
1723 assert_eq!(*byte, write_buf[idx]);
1724 }
1725 }
1726
1727 #[test]
1728 fn buffer_read_write_seek() {
1729 let pool = Pool::new_test_default();
1730 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1731 let mut buffer =
1732 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1733 assert_eq!(buffer.parts.len(), 2);
1736 assert_eq!(buffer.cap(), alloc_bytes);
1737 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1738 assert_eq!(
1739 buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1740 write_buf.len()
1741 );
1742 const SEEK_FROM_END: usize = 64;
1743 const READ_LEN: usize = 12;
1744 assert_eq!(
1745 buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1746 u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1747 );
1748 let mut read_buf = [0xff; READ_LEN];
1749 assert_eq!(
1750 buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1751 read_buf.len()
1752 );
1753 assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1754 }
1755
1756 #[test_case(32; "single buffer part")]
1757 #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1758 fn buffer_pad(pad_size: usize) {
1759 let mut pool = Pool::new_test_default();
1760 pool.set_min_tx_buffer_length(pad_size);
1761 for offset in 0..pad_size {
1762 Arc::get_mut(&mut pool)
1763 .expect("there are multiple owners of the underlying VMO")
1764 .fill_sentinel_bytes();
1765 let mut buffer =
1766 pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1767 buffer.check_write_and_pad(offset, pad_size);
1768 }
1769 }
1770
1771 #[test]
1772 fn buffer_pad_grow() {
1773 const BUFFER_PARTS: u8 = 3;
1774 let mut pool = Pool::new_test_default();
1775 let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1776 * u32::from(BUFFER_PARTS)
1777 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1778 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1779 pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1780 for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1781 Arc::get_mut(&mut pool)
1782 .expect("there are multiple owners of the underlying VMO")
1783 .fill_sentinel_bytes();
1784 let mut alloc =
1785 pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1786 alloc
1787 .init(
1788 DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1789 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1790 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1791 )
1792 .expect("head/body/tail sizes are representable with u16/u32/u16");
1793 let mut buffer = Buffer::try_from(alloc).unwrap();
1794 buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1795 }
1796 }
1797
1798 #[test_case( 0; "writes at the beginning")]
1799 #[test_case( 15; "writes in the first part")]
1800 #[test_case( 75; "writes in the second part")]
1801 #[test_case(135; "writes in the third part")]
1802 #[test_case(195; "writes in the last part")]
1803 fn buffer_used(write_offset: usize) {
1804 let pool = Pool::new_test_default();
1805 let mut buffer =
1806 pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1807 let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1808 if i == 0 {
1809 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1810 } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1811 DEFAULT_BUFFER_LENGTH.get()
1812 } else {
1813 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1814 }
1815 });
1816 assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1817 buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1818 assert_eq!(
1821 buffer.parts.iter().zip(expected_caps).fold(
1822 Some(write_offset),
1823 |offset, (part, expected_cap)| {
1824 assert_eq!(part.cap, expected_cap);
1826
1827 match offset {
1828 Some(offset) => {
1829 if offset >= expected_cap {
1830 assert_eq!(part.len, part.cap);
1832 Some(offset - part.len)
1833 } else {
1834 assert_eq!(part.len, offset + 1);
1836 let mut buf = [0];
1837 assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1839 assert_eq!(buf[0], WRITE_BYTE);
1840 None
1841 }
1842 }
1843 None => {
1844 assert_eq!(part.len, 0);
1846 None
1847 }
1848 }
1849 }
1850 ),
1851 None
1852 )
1853 }
1854
1855 #[test]
1856 fn buffer_commit() {
1857 let pool = Pool::new_test_default();
1858 for offset in 0..MAX_BUFFER_BYTES {
1859 let mut buffer = pool
1860 .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1861 .expect("failed to allocate buffer");
1862 buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1863 buffer.commit();
1864 for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1865 let head_length = descriptor.head_length();
1866 let tail_length = descriptor.tail_length();
1867 let data_length = descriptor.data_length();
1868 assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1869 assert_eq!(
1870 u32::from(head_length + tail_length) + data_length,
1871 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1872 );
1873 }
1874 }
1875 }
1876
1877 #[test]
1878 fn allocate_under_device_minimum() {
1879 const MIN_TX_DATA: usize = 32;
1880 const ALLOC_SIZE: usize = 16;
1881 const WRITE_BYTE: u8 = 0xff;
1882 const WRITE_SENTINAL_BYTE: u8 = 0xee;
1883 const READ_SENTINAL_BYTE: u8 = 0xdd;
1884 let mut config = DEFAULT_CONFIG;
1885 config.buffer_layout.min_tx_data = MIN_TX_DATA;
1886 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1887 for mut buffer in Vec::from_iter(std::iter::from_fn({
1888 let pool = pool.clone();
1889 move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1890 })) {
1891 buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1892 }
1893 let mut allocated =
1894 pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1895 assert_eq!(allocated.cap(), ALLOC_SIZE);
1896 const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1897 assert_matches!(
1898 allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1899 Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1900 );
1901 allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1902 assert_matches!(allocated.pad(), Ok(()));
1903 assert_eq!(allocated.cap(), MIN_TX_DATA);
1904 assert_eq!(allocated.len(), MIN_TX_DATA);
1905 const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1906 let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1907 assert_matches!(
1908 allocated.read_at(0, &mut read_buf[..]),
1909 Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1910 );
1911 allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1912 assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1913 assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1914 assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1915 }
1916
1917 #[test]
1918 fn invalid_tx_length() {
1919 let mut config = DEFAULT_CONFIG;
1920 config.buffer_layout.length = usize::from(u16::MAX) + 2;
1921 config.buffer_layout.min_tx_head = 0;
1922 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1923 assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1924 }
1925
1926 #[test]
1927 fn rx_leases() {
1928 let mut executor = fuchsia_async::TestExecutor::new();
1929 let state = RxLeaseHandlingState::new_with_enabled(true);
1930 let mut watcher = RxLeaseWatcher { state: &state };
1931
1932 {
1933 let mut fut = pin!(watcher.wait_until(0));
1934 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1935 }
1936 {
1937 state.rx_complete();
1938 let mut fut = pin!(watcher.wait_until(1));
1939 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1940 }
1941 {
1942 let mut fut = pin!(watcher.wait_until(0));
1943 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1944 }
1945 {
1946 let mut fut = pin!(watcher.wait_until(3));
1947 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1948 state.rx_complete();
1949 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1950 state.rx_complete();
1951 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1952 }
1953 let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1956 {
1957 let mut fut = pin!(watcher.wait_until(10000));
1958 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1959 }
1960 let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1961 assert_eq!(counter_before, counter_after);
1962 }
1963}