1use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32pub(in crate::session) struct Pool {
34 base: NonNull<u8>,
37 bytes: usize,
39 descriptors: Descriptors,
41 tx_alloc_state: Mutex<TxAllocState>,
43 pub(in crate::session) rx_pending: Pending<Rx>,
45 buffer_layout: BufferLayout,
47 rx_leases: RxLeaseHandlingState,
49}
50
51unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58struct TxAllocState {
60 requests: VecDeque<TxAllocReq>,
62 free_list: TxFreeList,
63}
64
65struct TxFreeList {
73 head: Option<DescId<Tx>>,
76 len: u16,
78}
79
80impl Pool {
81 pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86 let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87 config;
88 let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89 let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90 Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92 let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94 descriptors.borrow_mut(&mut curr).set_nxt(head);
95 Some(curr)
96 });
97
98 for rx_desc in rx_free.iter_mut() {
99 descriptors.borrow_mut(rx_desc).initialize(
100 ChainLength::ZERO,
101 0,
102 buffer_layout.length.try_into().unwrap(),
103 0,
104 );
105 }
106
107 let tx_alloc_state = TxAllocState {
108 free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109 requests: VecDeque::new(),
110 };
111
112 let size = buffer_stride.get() * u64::from(num_buffers);
113 let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115 const VMO_NAME: zx::Name =
116 const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117 data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118 let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123 let base = NonNull::new(
126 vmar_root_self()
127 .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128 .map_err(|status| Error::Map("data", status))? as *mut u8,
129 )
130 .unwrap();
131
132 Ok((
133 Arc::new(Pool {
134 base,
135 bytes: len,
136 descriptors,
137 tx_alloc_state: Mutex::new(tx_alloc_state),
138 rx_pending: Pending::new(rx_free),
139 buffer_layout,
140 rx_leases: RxLeaseHandlingState::new_with_flags(options),
141 }),
142 descriptors_vmo,
143 data_vmo,
144 ))
145 }
146
147 pub(in crate::session) async fn alloc_tx(
156 self: &Arc<Self>,
157 num_parts: ChainLength,
158 ) -> AllocGuard<Tx> {
159 let receiver = {
160 let mut state = self.tx_alloc_state.lock();
161 match state.free_list.try_alloc(num_parts, &self.descriptors) {
162 Some(allocated) => {
163 return AllocGuard::new(allocated, self.clone());
164 }
165 None => {
166 let (request, receiver) = TxAllocReq::new(num_parts);
167 state.requests.push_back(request);
168 receiver
169 }
170 }
171 };
172 receiver.await.unwrap()
174 }
175
176 pub(in crate::session) fn try_alloc_single_part_tx_buffer(
181 self: &Arc<Self>,
182 num_bytes: usize,
183 ) -> Result<Option<SinglePartTxBuffer>> {
184 let BufferLayout { min_tx_data: _, min_tx_head, min_tx_tail, length: buffer_length } =
185 self.buffer_layout;
186 if num_bytes > buffer_length - usize::from(min_tx_head) - usize::from(min_tx_tail) {
187 return Err(Error::TxLength);
188 }
189 self.tx_alloc_state
190 .lock()
191 .free_list
192 .try_alloc(ChainLength::try_from(1u8).unwrap(), &self.descriptors)
193 .map(|allocated| -> Result<SinglePartTxBuffer> {
194 let mut alloc = AllocGuard::new(allocated, self.clone());
195 alloc.init(num_bytes)?;
196 let buffer = Buffer::from(alloc);
197 Ok(SinglePartTxBuffer::new(buffer, num_bytes).expect("must be single part"))
198 })
199 .transpose()
200 }
201
202 pub(in crate::session) async fn alloc_tx_buffer(
209 self: &Arc<Self>,
210 num_bytes: usize,
211 ) -> Result<Buffer<Tx>> {
212 self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
213 }
214
215 pub(in crate::session) async fn alloc_tx_buffers<'a>(
228 self: &'a Arc<Self>,
229 num_bytes: usize,
230 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
231 let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
232 self.buffer_layout;
233 let tx_head = usize::from(min_tx_head);
234 let tx_tail = usize::from(min_tx_tail);
235 let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
236 let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
237 let chain_length = ChainLength::try_from(num_parts)?;
238 let first = self.alloc_tx(chain_length).await;
239 let iter = std::iter::once(first)
240 .chain(std::iter::from_fn(move || {
241 let mut state = self.tx_alloc_state.lock();
242 state
243 .free_list
244 .try_alloc(chain_length, &self.descriptors)
245 .map(|allocated| AllocGuard::new(allocated, self.clone()))
246 }))
247 .fuse()
250 .map(move |mut alloc| {
251 alloc.init(num_bytes)?;
252 Ok(alloc.into())
253 });
254 Ok(iter)
255 }
256
257 pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
259 self.rx_pending.extend(descs.into_iter().map(|mut desc| {
260 self.descriptors.borrow_mut(&mut desc).initialize(
261 ChainLength::ZERO,
262 0,
263 self.buffer_layout.length.try_into().unwrap(),
264 0,
265 );
266 desc
267 }));
268 }
269
270 fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
276 let mut to_fulfill = ArrayVec::<
280 (TxAllocReq, AllocGuard<Tx>),
281 { netdev::MAX_DESCRIPTOR_CHAIN as usize },
282 >::new();
283
284 let mut state = self.tx_alloc_state.lock();
285
286 {
287 let mut descs = chain.into_iter();
288 state.free_list.len += u16::try_from(descs.len()).unwrap();
292 let head = descs.next();
293 let old_head = std::mem::replace(&mut state.free_list.head, head);
294 let mut tail = descs.last();
295 let mut tail_ref = self.descriptors.borrow_mut(
296 tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
297 );
298 tail_ref.set_nxt(old_head);
299 }
300
301 while let Some(req) = state.requests.front() {
304 if req.sender.is_canceled() {
309 let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
310 continue;
311 }
312 let size = req.size;
313 match state.free_list.try_alloc(size, &self.descriptors) {
314 Some(descs) => {
315 let req = state.requests.pop_front().unwrap();
317 to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
318
319 if to_fulfill.is_full() {
323 drop(state);
324 for (req, alloc) in to_fulfill.drain(..) {
325 req.fulfill(alloc)
326 }
327 state = self.tx_alloc_state.lock();
328 }
329 }
330 None => break,
331 }
332 }
333
334 drop(state);
336 for (req, alloc) in to_fulfill {
338 req.fulfill(alloc)
339 }
340 }
341
342 pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
346 let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
347 Ok(self.free_tx(chain))
348 }
349
350 pub(in crate::session) fn rx_completed(
356 self: &Arc<Self>,
357 head: DescId<Rx>,
358 ) -> Result<Buffer<Rx>> {
359 let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
360 let alloc = AllocGuard::new(descs, self.clone());
361 Ok(alloc.into())
362 }
363
364 fn get_slice<'a, K: AllocKind>(&self, desc: &'a DescId<K>) -> &'a [u8] {
365 let desc = self.descriptors.borrow(desc);
366 let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
367 .expect("usize must hold u64");
368 let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
369 unsafe {
374 let ptr = self.base.as_ptr().add(offset);
375 std::slice::from_raw_parts(ptr, len)
376 }
377 }
378
379 fn get_slice_mut<'a, K: AllocKind>(&self, desc: &'a mut DescId<K>) -> &'a mut [u8] {
380 let desc = self.descriptors.borrow_mut(desc);
381 let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
382 .expect("usize must hold u64");
383 let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
384 unsafe {
389 let ptr = self.base.as_ptr().add(offset);
390 std::slice::from_raw_parts_mut(ptr, len)
391 }
392 }
393}
394
395impl Drop for Pool {
396 fn drop(&mut self) {
397 unsafe {
398 vmar_root_self()
399 .unmap(self.base.as_ptr() as usize, self.bytes)
400 .expect("failed to unmap VMO for Pool")
401 }
402 }
403}
404
405impl TxFreeList {
406 fn try_alloc(
410 &mut self,
411 num_parts: ChainLength,
412 descriptors: &Descriptors,
413 ) -> Option<Chained<DescId<Tx>>> {
414 if u16::from(num_parts.get()) > self.len {
415 return None;
416 }
417
418 let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
419 let new_head = self.head.as_ref().and_then(|head| {
420 let nxt = descriptors.borrow(head).nxt();
421 nxt.map(|id| unsafe {
422 DescId::from_raw(id)
425 })
426 });
427 std::mem::replace(&mut self.head, new_head)
428 });
429 let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
430 assert_eq!(allocated.len(), num_parts.into());
431 self.len -= u16::from(num_parts.get());
432 Some(allocated)
433 }
434}
435
436pub struct Buffer<K: AllocKind> {
438 alloc: AllocGuard<K>,
440}
441
442impl<K: AllocKind> Buffer<K> {
443 pub fn len(&self) -> usize {
445 self.parts().map(|s| s.len()).sum()
446 }
447
448 fn parts(&self) -> impl Iterator<Item = &[u8]> + '_ {
450 self.alloc.descs.iter().map(|desc| self.alloc.pool.get_slice(desc))
451 }
452
453 fn parts_mut(&mut self) -> impl Iterator<Item = &mut [u8]> + '_ {
455 self.alloc.descs.iter_mut().map(|desc| self.alloc.pool.get_slice_mut(desc))
456 }
457
458 pub(in crate::session) fn leak(mut self) -> DescId<K> {
460 let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
461 descs.into_iter().next().unwrap()
462 }
463
464 pub fn frame_type(&self) -> Result<netdev::FrameType> {
466 self.alloc.descriptor().frame_type()
467 }
468
469 pub fn port(&self) -> Port {
471 self.alloc.descriptor().port()
472 }
473
474 pub fn as_slice(&self) -> Option<&[u8]> {
476 if self.alloc.len() != 1 {
477 return None;
478 }
479 self.parts().next()
480 }
481
482 pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
484 if self.alloc.len() != 1 {
485 return None;
486 }
487 self.parts_mut().next()
488 }
489
490 pub fn io(&self) -> BufferIORef<'_, K> {
492 let mut len = 0;
493 let parts: Chained<&[u8]> = self.parts().inspect(|s| len += s.len()).collect();
494 BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
495 }
496
497 pub fn io_mut(&mut self) -> BufferIOMut<'_, K> {
499 let mut len = 0;
500 let parts: Chained<&mut [u8]> = self.parts_mut().inspect(|s| len += s.len()).collect();
501 BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
502 }
503}
504
505impl Buffer<Tx> {
506 pub fn set_port(&mut self, port: Port) {
508 self.alloc.descriptor_mut().set_port(port)
509 }
510
511 pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
513 self.alloc.descriptor_mut().set_frame_type(frame_type)
514 }
515
516 pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
518 self.alloc.descriptor_mut().set_tx_flags(flags)
519 }
520}
521
522impl Buffer<Rx> {
523 pub async fn into_tx(self) -> Buffer<Tx> {
525 let Buffer { alloc } = self;
526 Buffer { alloc: alloc.into_tx().await }
527 }
528
529 pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
531 self.alloc.descriptor().rx_flags()
532 }
533}
534
535impl<K: AllocKind> Debug for Buffer<K> {
536 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537 let Self { alloc } = self;
538 f.debug_struct("Buffer").field("alloc", alloc).finish()
539 }
540}
541
542pub struct SinglePartTxBuffer(Buffer<Tx>);
545
546impl SinglePartTxBuffer {
547 pub fn new(buffer: Buffer<Tx>, len: usize) -> Option<Self> {
550 if buffer.alloc.len() != 1 {
551 None
552 } else {
553 let cap = usize::try_from(buffer.alloc.descriptor().data_length())
554 .expect("u32 must fit in a usize");
555 if cap < len {
556 return None;
557 }
558 Some(Self(buffer))
559 }
560 }
561
562 pub fn into_inner(self) -> Buffer<Tx> {
564 let Self(buffer) = self;
565 buffer
566 }
567}
568
569impl AsRef<[u8]> for SinglePartTxBuffer {
570 fn as_ref(&self) -> &[u8] {
571 let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_ref() };
574 self.0.alloc.pool.get_slice(desc)
575 }
576}
577
578impl AsMut<[u8]> for SinglePartTxBuffer {
579 fn as_mut(&mut self) -> &mut [u8] {
580 let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_mut() };
583 self.0.alloc.pool.get_slice_mut(desc)
584 }
585}
586
587impl packet::FragmentedBuffer for SinglePartTxBuffer {
588 fn len(&self) -> usize {
589 let desc = self.0.alloc.descriptor();
590 usize::try_from(desc.data_length()).expect("u32 must fit in a usize")
591 }
592
593 fn with_bytes<'a, R, F>(&'a self, f: F) -> R
594 where
595 F: for<'b> FnOnce(packet::FragmentedBytes<'b, 'a>) -> R,
596 {
597 f(packet::FragmentedBytes::new(&mut [self.as_ref()][..]))
598 }
599}
600
601pub struct BufferIO<T, K: AllocKind> {
606 parts: Chained<T>,
607 pos: usize,
608 len: usize,
609 _marker: std::marker::PhantomData<K>,
610}
611
612pub type BufferIORef<'a, K> = BufferIO<&'a [u8], K>;
613pub type BufferIOMut<'a, K> = BufferIO<&'a mut [u8], K>;
614
615impl<T> BufferIO<T, Tx>
616where
617 T: AsMut<[u8]>,
618{
619 pub fn write_at(&mut self, mut offset: usize, src: &[u8]) -> usize {
631 let mut total = 0;
632
633 for slice in self.parts.iter_mut() {
634 let slice = slice.as_mut();
635 if offset < slice.len() {
636 let available = slice.len() - offset;
637 let to_copy = std::cmp::min(src.len() - total, available);
638 slice[offset..offset + to_copy].copy_from_slice(&src[total..total + to_copy]);
639 total += to_copy;
640 offset = 0;
641 if total == src.len() {
642 break;
643 }
644 } else {
645 offset -= slice.len();
646 }
647 }
648 total
649 }
650}
651
652impl<T, K: AllocKind> BufferIO<T, K>
653where
654 T: AsRef<[u8]>,
655{
656 pub fn read_at(&self, mut offset: usize, dst: &mut [u8]) -> usize {
667 let mut total = 0;
668
669 for slice in self.parts.iter() {
670 let slice = slice.as_ref();
671 if offset < slice.len() {
672 let available = slice.len() - offset;
673 let to_copy = std::cmp::min(dst.len() - total, available);
674 dst[total..total + to_copy].copy_from_slice(&slice[offset..offset + to_copy]);
675 total += to_copy;
676 offset = 0;
677 if total == dst.len() {
678 break;
679 }
680 } else {
681 offset -= slice.len();
682 }
683 }
684 total
685 }
686}
687
688impl AllocGuard<Rx> {
689 async fn into_tx(mut self) -> AllocGuard<Tx> {
695 let mut tx = self.pool.alloc_tx(self.descs.len).await;
696 std::mem::swap(&mut self.descs.storage, unsafe {
702 std::mem::transmute(&mut tx.descs.storage)
703 });
704 tx
705 }
706}
707
708struct Chained<T> {
710 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
711 len: ChainLength,
712}
713
714impl<T> Deref for Chained<T> {
715 type Target = [T];
716
717 fn deref(&self) -> &Self::Target {
718 unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
720 }
721}
722
723impl<T> DerefMut for Chained<T> {
724 fn deref_mut(&mut self) -> &mut Self::Target {
725 unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
727 }
728}
729
730impl<T> Drop for Chained<T> {
731 fn drop(&mut self) {
732 unsafe {
734 std::ptr::drop_in_place(self.deref_mut());
735 }
736 }
737}
738
739impl<T: Debug> Debug for Chained<T> {
740 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
741 f.debug_list().entries(self.iter()).finish()
742 }
743}
744
745impl<T> Chained<T> {
746 #[allow(clippy::uninit_assumed_init)]
747 fn empty() -> Self {
748 Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
755 }
756}
757
758impl<T> FromIterator<T> for Chained<T> {
759 fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
764 let mut result = Self::empty();
765 let mut len = 0u8;
766 for (idx, e) in elements.into_iter().enumerate() {
767 result.storage[idx] = MaybeUninit::new(e);
768 len += 1;
769 }
770 assert!(len > 0);
771 result.len = ChainLength::try_from(len).unwrap();
774 result
775 }
776}
777
778impl<T> IntoIterator for Chained<T> {
779 type Item = T;
780 type IntoIter = ChainedIter<T>;
781
782 fn into_iter(mut self) -> Self::IntoIter {
783 let len = self.len;
784 self.len = ChainLength::ZERO;
785 #[allow(clippy::uninit_assumed_init)]
792 let storage =
793 std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
794 ChainedIter { storage, len, consumed: 0 }
795 }
796}
797
798struct ChainedIter<T> {
799 storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
800 len: ChainLength,
801 consumed: u8,
802}
803
804impl<T> Iterator for ChainedIter<T> {
805 type Item = T;
806
807 fn next(&mut self) -> Option<Self::Item> {
808 if self.consumed < self.len.get() {
809 let value = unsafe {
812 std::mem::replace(
813 &mut self.storage[usize::from(self.consumed)],
814 MaybeUninit::uninit(),
815 )
816 .assume_init()
817 };
818 self.consumed += 1;
819 Some(value)
820 } else {
821 None
822 }
823 }
824
825 fn size_hint(&self) -> (usize, Option<usize>) {
826 let len = usize::from(self.len.get() - self.consumed);
827 (len, Some(len))
828 }
829}
830
831impl<T> ExactSizeIterator for ChainedIter<T> {}
832
833impl<T> Drop for ChainedIter<T> {
834 fn drop(&mut self) {
835 unsafe {
837 std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
838 &mut self.storage[self.consumed.into()..self.len.into()],
839 ));
840 }
841 }
842}
843
844pub(in crate::session) struct AllocGuard<K: AllocKind> {
846 descs: Chained<DescId<K>>,
847 pool: Arc<Pool>,
848}
849
850impl<K: AllocKind> Debug for AllocGuard<K> {
851 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
852 let Self { descs, pool: _ } = self;
853 f.debug_struct("AllocGuard").field("descs", descs).finish()
854 }
855}
856
857impl<K: AllocKind> AllocGuard<K> {
858 fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
859 Self { descs, pool }
860 }
861
862 fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
864 self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
865 }
866
867 fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
869 let descriptors = &self.pool.descriptors;
870 self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
871 }
872
873 fn descriptor(&self) -> DescRef<'_, K> {
875 self.descriptors().next().expect("descriptors must not be empty")
876 }
877
878 fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
880 self.descriptors_mut().next().expect("descriptors must not be empty")
881 }
882}
883
884impl AllocGuard<Tx> {
885 fn init(&mut self, requested_bytes: usize) -> Result<()> {
900 let len = self.len();
901 let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data } =
902 self.pool.buffer_layout;
903
904 let target_len = requested_bytes.max(usize::from(min_tx_data));
905 let mut remaining_target = target_len;
906 let mut remaining_requested = requested_bytes;
907
908 for (desc_id, clen) in self.descs.iter_mut().zip((0..len).rev()) {
909 let chain_length = ChainLength::try_from(clen).unwrap();
910 let head_length = if clen + 1 == len { min_tx_head } else { 0 };
911 let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
912
913 let available_bytes =
916 u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
917 .unwrap();
918
919 let data_length = match u32::try_from(remaining_target) {
920 Ok(target) => {
921 if target < available_bytes {
922 let excess = available_bytes - target;
926 tail_length = u16::try_from(excess)
927 .ok_checked::<TryFromIntError>()
928 .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
929 .ok_or(Error::TxLength)?;
930 }
931 target.min(available_bytes)
932 }
933 Err(TryFromIntError { .. }) => available_bytes,
934 };
935
936 let data_length_usize = usize::try_from(data_length).expect("u32 must fit in a usize");
937 let requested_in_part = std::cmp::min(remaining_requested, data_length_usize);
938 let pad_in_part = data_length_usize - requested_in_part;
939
940 {
942 let mut descriptor = self.pool.descriptors.borrow_mut(desc_id);
943 descriptor.initialize(chain_length, head_length, data_length, tail_length);
944 }
945
946 if pad_in_part > 0 {
958 let slice = self.pool.get_slice_mut(desc_id);
959 slice[requested_in_part..requested_in_part + pad_in_part].fill(0);
960 }
961
962 remaining_target -= data_length_usize;
963 remaining_requested -= requested_in_part;
964 }
965 assert_eq!(remaining_target, 0);
966 Ok(())
967 }
968}
969
970impl<K: AllocKind> Drop for AllocGuard<K> {
971 fn drop(&mut self) {
972 if self.is_empty() {
973 return;
974 }
975 K::free(private::Allocation(self));
976 }
977}
978
979impl<K: AllocKind> Deref for AllocGuard<K> {
980 type Target = [DescId<K>];
981
982 fn deref(&self) -> &Self::Target {
983 self.descs.deref()
984 }
985}
986
987impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
988 fn from(alloc: AllocGuard<K>) -> Self {
989 Self { alloc }
990 }
991}
992
993impl<T, K: AllocKind> Read for BufferIO<T, K>
994where
995 T: AsRef<[u8]>,
996{
997 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
998 let read_len = self.read_at(self.pos, buf);
999 self.pos += read_len;
1000 Ok(read_len)
1001 }
1002}
1003
1004impl<T> Write for BufferIO<T, Tx>
1005where
1006 T: AsMut<[u8]>,
1007{
1008 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1009 let write_len = self.write_at(self.pos, buf);
1010 self.pos += write_len;
1011 Ok(write_len)
1012 }
1013
1014 fn flush(&mut self) -> std::io::Result<()> {
1015 Ok(())
1016 }
1017}
1018
1019impl<T, K: AllocKind> Seek for BufferIO<T, K> {
1020 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1021 let pos = match pos {
1022 SeekFrom::Start(offset) => offset,
1023 SeekFrom::End(offset) => {
1024 let end = i64::try_from(self.len).unwrap();
1025 u64::try_from(end.wrapping_add(offset)).unwrap()
1026 }
1027 SeekFrom::Current(offset) => {
1028 let current = i64::try_from(self.pos).map_err(|TryFromIntError { .. }| {
1029 std::io::Error::from(std::io::ErrorKind::InvalidInput)
1030 })?;
1031 u64::try_from(current.wrapping_add(offset)).unwrap()
1032 }
1033 };
1034 self.pos = usize::try_from(pos).map_err(|TryFromIntError { .. }| {
1035 std::io::Error::from(std::io::ErrorKind::InvalidInput)
1036 })?;
1037 Ok(pos)
1038 }
1039}
1040
1041struct TxAllocReq {
1043 sender: Sender<AllocGuard<Tx>>,
1044 size: ChainLength,
1045}
1046
1047impl TxAllocReq {
1048 fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1049 let (sender, receiver) = channel();
1050 (TxAllocReq { sender, size }, receiver)
1051 }
1052
1053 fn fulfill(self, guard: AllocGuard<Tx>) {
1061 let Self { sender, size: _ } = self;
1062 match sender.send(guard) {
1063 Ok(()) => (),
1064 Err(guard) => {
1065 drop(guard);
1068 }
1069 }
1070 }
1071}
1072
1073mod private {
1076 use super::{AllocKind, Rx, Tx};
1077 pub trait Sealed: 'static + Sized {}
1078 impl Sealed for Rx {}
1079 impl Sealed for Tx {}
1080
1081 pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1085}
1086
1087pub trait AllocKind: private::Sealed {
1090 const REFL: AllocKindRefl;
1092
1093 fn free(alloc: private::Allocation<'_, Self>);
1095}
1096
1097pub enum Tx {}
1099pub enum Rx {}
1101
1102pub enum AllocKindRefl {
1104 Tx,
1105 Rx,
1106}
1107
1108impl AllocKindRefl {
1109 pub(in crate::session) fn as_str(&self) -> &'static str {
1110 match self {
1111 AllocKindRefl::Tx => "Tx",
1112 AllocKindRefl::Rx => "Rx",
1113 }
1114 }
1115}
1116
1117impl AllocKind for Tx {
1118 const REFL: AllocKindRefl = AllocKindRefl::Tx;
1119
1120 fn free(alloc: private::Allocation<'_, Self>) {
1121 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1122 pool.free_tx(std::mem::replace(descs, Chained::empty()));
1123 }
1124}
1125
1126impl AllocKind for Rx {
1127 const REFL: AllocKindRefl = AllocKindRefl::Rx;
1128
1129 fn free(alloc: private::Allocation<'_, Self>) {
1130 let private::Allocation(AllocGuard { pool, descs }) = alloc;
1131 pool.free_rx(std::mem::replace(descs, Chained::empty()));
1132 pool.rx_leases.rx_complete();
1133 }
1134}
1135
1136pub(in crate::session) struct RxLeaseHandlingState {
1138 can_watch_rx_leases: AtomicBool,
1139 rx_frame_counter: AtomicU64,
1149 rx_lease_waker: AtomicWaker,
1150}
1151
1152impl RxLeaseHandlingState {
1153 fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1154 Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1155 }
1156
1157 fn new_with_enabled(enabled: bool) -> Self {
1158 Self {
1159 can_watch_rx_leases: AtomicBool::new(enabled),
1160 rx_frame_counter: AtomicU64::new(0),
1161 rx_lease_waker: AtomicWaker::new(),
1162 }
1163 }
1164
1165 fn rx_complete(&self) {
1168 let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1169 let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1170
1171 if prev == u64::MAX {
1174 rx_lease_waker.wake();
1175 }
1176 }
1177}
1178
1179pub(in crate::session) trait RxLeaseHandlingStateContainer {
1182 fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1183}
1184
1185impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1186 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1187 self.borrow()
1188 }
1189}
1190
1191impl RxLeaseHandlingStateContainer for Arc<Pool> {
1192 fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1193 &self.rx_leases
1194 }
1195}
1196
1197pub(in crate::session) struct RxLeaseWatcher<T> {
1199 state: T,
1200}
1201
1202impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1203 pub(in crate::session) fn new(state: T) -> Self {
1210 assert!(
1211 state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1212 "can't watch rx leases"
1213 );
1214 Self { state }
1215 }
1216
1217 pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1226 let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1235 self.state.lease_handling_state();
1236
1237 let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1238 let _guard = scopeguard::guard((), |()| {
1241 let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1242 });
1243
1244 if prev >= hold_until_frame {
1246 return;
1247 }
1248 let threshold = prev.wrapping_sub(hold_until_frame);
1251 futures::future::poll_fn(|cx| {
1252 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1253 if v < threshold {
1254 return Poll::Ready(());
1255 }
1256 rx_lease_waker.register(cx.waker());
1257 let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1258 if v < threshold {
1259 return Poll::Ready(());
1260 }
1261 Poll::Pending
1262 })
1263 .await;
1264 }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269
1270 use super::*;
1271
1272 use assert_matches::assert_matches;
1273 use fuchsia_async as fasync;
1274 use futures::future::FutureExt;
1275 use test_case::test_case;
1276
1277 use std::collections::HashSet;
1278 use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1279 use std::pin::pin;
1280 use std::task::{Poll, Waker};
1281
1282 const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1283 const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1284 const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1286 const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1287 const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1288 const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1289 * netdev::MAX_DESCRIPTOR_CHAIN as usize
1290 - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1291 - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1292
1293 const SENTINEL_BYTE: u8 = 0xab;
1294 const WRITE_BYTE: u8 = 1;
1295 const PAD_BYTE: u8 = 0;
1296
1297 const DEFAULT_CONFIG: Config = Config {
1298 buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1299 num_rx_buffers: DEFAULT_RX_BUFFERS,
1300 num_tx_buffers: DEFAULT_TX_BUFFERS,
1301 options: netdev::SessionFlags::empty(),
1302 buffer_layout: BufferLayout {
1303 length: DEFAULT_BUFFER_LENGTH.get(),
1304 min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1305 min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1306 min_tx_data: 0,
1307 },
1308 };
1309
1310 impl Pool {
1311 fn new_test_default() -> Arc<Self> {
1312 let (pool, _descriptors, _data) =
1313 Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1314 pool
1315 }
1316
1317 async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1318 self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1319 .await
1320 }
1321
1322 fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1323 self.alloc_tx_checked(n).now_or_never()
1324 }
1325
1326 fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1327 std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1328 }
1329
1330 fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1331 self.alloc_tx_buffer(num_bytes)
1332 .now_or_never()
1333 .transpose()
1334 .expect("invalid arguments for alloc_tx_buffer")
1335 }
1336
1337 fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1338 Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1339 }
1340
1341 fn fill_sentinel_bytes(&mut self) {
1342 unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1345 }
1346 }
1347
1348 impl Buffer<Tx> {
1349 fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1353 {
1354 let mut io = self.io_mut();
1355 assert_eq!(io.write_at(offset, &[WRITE_BYTE][..]), 1);
1356 }
1357 assert_eq!(self.len(), pad_size);
1358 const INIT_BYTE: u8 = 42;
1361 let mut read_buf = vec![INIT_BYTE; pad_size];
1362 assert_eq!(self.io().read_at(0, &mut read_buf[..]), read_buf.len());
1363 for (idx, byte) in read_buf.iter().enumerate() {
1364 if idx < offset {
1365 assert_eq!(*byte, SENTINEL_BYTE);
1366 } else if idx == offset {
1367 assert_eq!(*byte, WRITE_BYTE);
1368 } else {
1369 assert_eq!(*byte, PAD_BYTE);
1370 }
1371 }
1372 }
1373 }
1374
1375 impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1376 where
1377 K: AllocKind,
1378 I: ExactSizeIterator<Item = u16>,
1379 T: Copy + IntoIterator<IntoIter = I>,
1380 {
1381 fn eq(&self, other: &T) -> bool {
1382 let iter = other.into_iter();
1383 if usize::from(self.len) != iter.len() {
1384 return false;
1385 }
1386 self.iter().zip(iter).all(|(l, r)| l.get() == r)
1387 }
1388 }
1389
1390 impl Debug for TxAllocReq {
1391 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1392 let TxAllocReq { sender: _, size } = self;
1393 f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1394 }
1395 }
1396
1397 #[test]
1398 fn alloc_tx_distinct() {
1399 let pool = Pool::new_test_default();
1400 let allocated = pool.alloc_tx_all(1);
1401 assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1402 let distinct = allocated
1403 .iter()
1404 .map(|alloc| {
1405 assert_eq!(alloc.descs.len(), 1);
1406 alloc.descs[0].get()
1407 })
1408 .collect::<HashSet<u16>>();
1409 assert_eq!(allocated.len(), distinct.len());
1410 }
1411
1412 #[test]
1413 fn alloc_tx_free_len() {
1414 let pool = Pool::new_test_default();
1415 {
1416 let allocated = pool.alloc_tx_all(2);
1417 assert_eq!(
1418 allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1419 DEFAULT_TX_BUFFERS.get().into()
1420 );
1421 assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1422 }
1423 assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1424 }
1425
1426 #[test]
1427 fn alloc_tx_chain() {
1428 let pool = Pool::new_test_default();
1429 let allocated = pool.alloc_tx_all(3);
1430 assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1431 assert_matches!(pool.alloc_tx_now_or_never(3), None);
1432 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1433 }
1434
1435 #[test]
1436 fn alloc_tx_many() {
1437 let pool = Pool::new_test_default();
1438 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1439 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1440 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1441 let data_len = usize::try_from(data_len).unwrap();
1442 let mut buffers = pool
1443 .alloc_tx_buffers(data_len)
1444 .now_or_never()
1445 .expect("failed to alloc")
1446 .unwrap()
1447 .collect::<Result<Vec<_>>>()
1450 .expect("buffer error");
1451 assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1452
1453 assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1456
1457 assert_matches!(buffers.pop(), Some(_));
1459 let mut more_buffers =
1460 pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1461 let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1462 assert_matches!(more_buffers.next(), None);
1463 drop(buffer);
1466 assert_matches!(more_buffers.next(), None);
1467 }
1468
1469 #[test]
1470 fn alloc_tx_after_free() {
1471 let pool = Pool::new_test_default();
1472 let mut allocated = pool.alloc_tx_all(1);
1473 assert_matches!(pool.alloc_tx_now_or_never(2), None);
1474 {
1475 let _drained = allocated.drain(..2);
1476 }
1477 assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1478 }
1479
1480 #[test]
1481 fn blocking_alloc_tx() {
1482 let mut executor = fasync::TestExecutor::new();
1483 let pool = Pool::new_test_default();
1484 let mut allocated = pool.alloc_tx_all(1);
1485 let alloc_fut = pool.alloc_tx_checked(1);
1486 let mut alloc_fut = pin!(alloc_fut);
1487 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1489 assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1491 let freed = allocated
1492 .pop()
1493 .expect("no fulfulled allocations")
1494 .iter()
1495 .map(|x| x.get())
1496 .collect::<Chained<_>>();
1497 let same_as_freed =
1498 |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1499 assert_matches!(
1501 &executor.run_until_stalled(&mut alloc_fut),
1502 Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1503 );
1504 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1506 }
1507
1508 #[test]
1509 fn blocking_alloc_tx_cancel_before_free() {
1510 let mut executor = fasync::TestExecutor::new();
1511 let pool = Pool::new_test_default();
1512 let mut allocated = pool.alloc_tx_all(1);
1513 {
1514 let alloc_fut = pool.alloc_tx_checked(1);
1515 let mut alloc_fut = pin!(alloc_fut);
1516 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1517 assert_matches!(
1518 pool.tx_alloc_state.lock().requests.as_slices(),
1519 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1520 );
1521 }
1522 assert_matches!(
1523 allocated.pop(),
1524 Some(AllocGuard { ref descs, pool: ref p })
1525 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1526 );
1527 let state = pool.tx_alloc_state.lock();
1528 assert_eq!(state.free_list.len, 1);
1529 assert!(state.requests.is_empty());
1530 }
1531
1532 #[test]
1533 fn blocking_alloc_tx_cancel_after_free() {
1534 let mut executor = fasync::TestExecutor::new();
1535 let pool = Pool::new_test_default();
1536 let mut allocated = pool.alloc_tx_all(1);
1537 {
1538 let alloc_fut = pool.alloc_tx_checked(1);
1539 let mut alloc_fut = pin!(alloc_fut);
1540 assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1541 assert_matches!(
1542 pool.tx_alloc_state.lock().requests.as_slices(),
1543 (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1544 );
1545 assert_matches!(
1546 allocated.pop(),
1547 Some(AllocGuard { ref descs, pool: ref p })
1548 if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1549 );
1550 }
1551 let state = pool.tx_alloc_state.lock();
1552 assert_eq!(state.free_list.len, 1);
1553 assert!(state.requests.is_empty());
1554 }
1555
1556 #[test]
1557 fn multiple_blocking_alloc_tx_fulfill_order() {
1558 const TASKS_TOTAL: usize = 3;
1559 let mut executor = fasync::TestExecutor::new();
1560 let pool = Pool::new_test_default();
1561 let mut allocated = pool.alloc_tx_all(1);
1562 let mut alloc_futs = (1..=TASKS_TOTAL)
1563 .rev()
1564 .map(|x| {
1565 let pool = pool.clone();
1566 (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1567 })
1568 .collect::<Vec<_>>();
1569
1570 for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1571 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1572 assert_eq!(idx + *req_size, TASKS_TOTAL);
1574 }
1575 {
1576 let state = pool.tx_alloc_state.lock();
1577 assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1579 let mut requests = state.requests.iter();
1580 assert!(requests.next().unwrap().sender.is_canceled());
1583 assert!(requests.all(|req| !req.sender.is_canceled()))
1585 }
1586
1587 let mut to_free = Vec::new();
1588 let mut freed = 0;
1589 for free_size in (1..=TASKS_TOTAL).rev() {
1590 let (_req_size, mut task) = alloc_futs.remove(0);
1591 for _ in 1..free_size {
1592 freed += 1;
1593 assert_matches!(
1594 allocated.pop(),
1595 Some(AllocGuard { ref descs, pool: ref p })
1596 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1597 );
1598 assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1599 }
1600 freed += 1;
1601 assert_matches!(
1602 allocated.pop(),
1603 Some(AllocGuard { ref descs, pool: ref p })
1604 if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1605 );
1606 match executor.run_until_stalled(&mut task) {
1607 Poll::Ready(alloc) => {
1608 assert_eq!(alloc.len(), free_size);
1609 to_free.push(alloc);
1611 }
1612 Poll::Pending => panic!("The request should be fulfilled"),
1613 }
1614 for (_req_size, task) in alloc_futs.iter_mut() {
1616 assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1617 }
1618 }
1619 assert!(pool.tx_alloc_state.lock().requests.is_empty());
1620 }
1621
1622 #[test]
1623 fn singleton_tx_layout() {
1624 let pool = Pool::new_test_default();
1625 let buffers = std::iter::from_fn(|| {
1626 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1627 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1628 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1629 pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1630 assert_eq!(buffer.alloc.descriptors().count(), 1);
1631 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1632 * u64::from(buffer.alloc[0].get());
1633 {
1634 let descriptor = buffer.alloc.descriptor();
1635 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1636 assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1637 assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1638 assert_eq!(descriptor.data_length(), data_len);
1639 assert_eq!(descriptor.offset(), offset);
1640 }
1641
1642 {
1643 let mut slices = buffer.parts();
1644 let slice = slices.next().expect("should have one slice");
1645 assert_matches!(slices.next(), None);
1646 assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1647 assert_eq!(
1648 slice.as_ptr(),
1649 pool.base.as_ptr().wrapping_add(
1650 usize::try_from(offset).unwrap()
1651 + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1652 )
1653 );
1654 }
1655 buffer
1656 })
1657 })
1658 .collect::<Vec<_>>();
1659 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1660 }
1661
1662 #[test]
1663 fn chained_tx_layout() {
1664 let pool = Pool::new_test_default();
1665 let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1666 - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1667 - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1668 let buffers = std::iter::from_fn(|| {
1669 pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1670 assert_eq!(buffer.parts().count(), 4);
1671 for (idx, (descriptor, slice)) in
1672 buffer.alloc.descriptors().zip(buffer.parts()).enumerate()
1673 {
1674 let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1675 let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1676 let tail_length = if chain_length == ChainLength::ZERO {
1677 DEFAULT_MIN_TX_BUFFER_TAIL
1678 } else {
1679 0
1680 };
1681 let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1682 - u32::from(head_length)
1683 - u32::from(tail_length);
1684 let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1685 * u64::from(buffer.alloc[idx].get());
1686 assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1687 assert_eq!(descriptor.head_length(), head_length);
1688 assert_eq!(descriptor.tail_length(), tail_length);
1689 assert_eq!(descriptor.offset(), offset);
1690 assert_eq!(descriptor.data_length(), data_len);
1691 if chain_length != ChainLength::ZERO {
1692 assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1693 }
1694
1695 assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1696 assert_eq!(
1697 slice.as_ptr(),
1698 pool.base.as_ptr().wrapping_add(
1699 usize::try_from(offset).unwrap() + usize::from(head_length),
1700 )
1701 );
1702 }
1703 buffer
1704 })
1705 })
1706 .collect::<Vec<_>>();
1707 assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1708 }
1709
1710 #[test]
1711 fn rx_distinct() {
1712 let pool = Pool::new_test_default();
1713 let mut guard = pool.rx_pending.inner.lock();
1714 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1715 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1716 let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1717 assert_eq!(descs.len(), distinct.len());
1718 }
1719
1720 #[test]
1721 fn alloc_rx_layout() {
1722 let pool = Pool::new_test_default();
1723 let mut guard = pool.rx_pending.inner.lock();
1724 let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1725 assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1726 for desc in descs.iter() {
1727 let descriptor = pool.descriptors.borrow(desc);
1728 let offset =
1729 u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1730 assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1731 assert_eq!(descriptor.head_length(), 0);
1732 assert_eq!(descriptor.tail_length(), 0);
1733 assert_eq!(descriptor.offset(), offset);
1734 assert_eq!(
1735 descriptor.data_length(),
1736 u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1737 );
1738 }
1739 }
1740
1741 #[test]
1742 fn buffer_read_at_write_at() {
1743 let pool = Pool::new_test_default();
1744 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1745 let mut buffer =
1746 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1747 assert_eq!(buffer.parts().count(), 2);
1750 assert_eq!(buffer.len(), alloc_bytes);
1751 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1752 assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), write_buf.len());
1753 let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1754 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), read_buf.len());
1755 for (idx, byte) in read_buf.iter().enumerate() {
1756 assert_eq!(*byte, write_buf[idx]);
1757 }
1758 }
1759
1760 #[test]
1761 fn buffer_write_at_short() {
1762 let pool = Pool::new_test_default();
1763 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1764 let mut buffer =
1765 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1766 assert_eq!(buffer.parts().count(), 2);
1767 assert_eq!(buffer.len(), alloc_bytes);
1768
1769 let write_buf = vec![WRITE_BYTE; alloc_bytes + 10];
1770
1771 assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1773
1774 let mut read_buf = vec![0; alloc_bytes];
1776 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1777 for byte in read_buf.iter() {
1778 assert_eq!(*byte, WRITE_BYTE);
1779 }
1780
1781 assert_eq!(buffer.io_mut().write_at(alloc_bytes + 1, &write_buf[..]), 0);
1783
1784 let offset = alloc_bytes / 2;
1786 let expected_write = alloc_bytes - offset;
1787 let write_buf = vec![2; alloc_bytes]; assert_eq!(buffer.io_mut().write_at(offset, &write_buf[..]), expected_write);
1789
1790 let mut read_buf = vec![0; alloc_bytes];
1792 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1793 for (idx, byte) in read_buf.iter().enumerate() {
1794 if idx < offset {
1795 assert_eq!(*byte, WRITE_BYTE);
1796 } else {
1797 assert_eq!(*byte, 2);
1798 }
1799 }
1800 }
1801
1802 #[test]
1803 fn buffer_read_at_short() {
1804 let pool = Pool::new_test_default();
1805 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1806 let mut buffer =
1807 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1808 assert_eq!(buffer.parts().count(), 2);
1809 assert_eq!(buffer.len(), alloc_bytes);
1810
1811 let write_buf = vec![WRITE_BYTE; alloc_bytes];
1812 assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1813
1814 let mut read_buf = vec![0xff; alloc_bytes + 10];
1816 assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1817 for (idx, byte) in read_buf.iter().enumerate() {
1818 if idx < alloc_bytes {
1819 assert_eq!(*byte, WRITE_BYTE);
1820 } else {
1821 assert_eq!(*byte, 0xff);
1822 }
1823 }
1824
1825 assert_eq!(buffer.io().read_at(alloc_bytes + 1, &mut read_buf[..]), 0);
1827
1828 let offset = alloc_bytes / 2;
1830 let expected_read = alloc_bytes - offset;
1831 let mut read_buf = vec![0xff; alloc_bytes];
1832 assert_eq!(buffer.io().read_at(offset, &mut read_buf[..]), expected_read);
1833 for (idx, byte) in read_buf.iter().enumerate() {
1834 if idx < expected_read {
1835 assert_eq!(*byte, WRITE_BYTE);
1836 } else {
1837 assert_eq!(*byte, 0xff);
1838 }
1839 }
1840 }
1841
1842 #[test]
1843 fn buffer_read_write_seek() {
1844 let pool = Pool::new_test_default();
1845 let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1846 let mut buffer =
1847 pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1848 assert_eq!(buffer.parts().count(), 2);
1851 assert_eq!(buffer.len(), alloc_bytes);
1852 let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1853
1854 let mut io = buffer.io_mut();
1855
1856 assert_eq!(io.write(&write_buf[..]).expect("failed to write into buffer"), write_buf.len());
1857 const SEEK_FROM_END: usize = 64;
1858 const READ_LEN: usize = 12;
1859 assert_eq!(
1860 io.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1861 u64::try_from(io.len - SEEK_FROM_END).unwrap()
1862 );
1863 let mut read_buf = [0xff; READ_LEN];
1864 assert_eq!(io.read(&mut read_buf[..]).expect("failed to read from buffer"), read_buf.len());
1865 assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1866 }
1867
1868 #[test_case(32; "single buffer part")]
1869 #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1870 fn buffer_pad(pad_size: usize) {
1871 let mut pool = Pool::new_test_default();
1872 pool.set_min_tx_buffer_length(pad_size);
1873 for offset in 0..pad_size {
1874 Arc::get_mut(&mut pool)
1875 .expect("there are multiple owners of the underlying VMO")
1876 .fill_sentinel_bytes();
1877 let mut buffer =
1878 pool.alloc_tx_buffer_now_or_never(offset + 1).expect("failed to allocate buffer");
1879 buffer.check_write_and_pad(offset, pad_size);
1880 }
1881 }
1882
1883 #[test]
1884 fn buffer_pad_grow() {
1885 const BUFFER_PARTS: u8 = 3;
1886 let mut pool = Pool::new_test_default();
1887 let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1888 * u32::from(BUFFER_PARTS)
1889 - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1890 - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1891 pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1892 for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1893 Arc::get_mut(&mut pool)
1894 .expect("there are multiple owners of the underlying VMO")
1895 .fill_sentinel_bytes();
1896 let mut alloc =
1897 pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1898 alloc
1899 .init(usize::try_from(offset).unwrap() + 1)
1900 .expect("head/body/tail sizes are representable with u16/u32/u16");
1901 let mut buffer = Buffer::try_from(alloc).unwrap();
1902 buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1903 }
1904 }
1905
1906 #[test_case( 0; "writes at the beginning")]
1907 #[test_case( 15; "writes in the first part")]
1908 #[test_case( 75; "writes in the second part")]
1909 #[test_case(135; "writes in the third part")]
1910 #[test_case(195; "writes in the last part")]
1911 fn buffer_used(write_offset: usize) {
1912 let pool = Pool::new_test_default();
1913 let mut buffer =
1914 pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1915 let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1916 if i == 0 {
1917 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1918 } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1919 DEFAULT_BUFFER_LENGTH.get()
1920 } else {
1921 DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1922 }
1923 });
1924 assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1925 assert_eq!(buffer.io_mut().write_at(write_offset, &[WRITE_BYTE][..]), 1);
1926 assert_eq!(
1929 buffer.parts().zip(expected_caps).fold(
1930 Some(write_offset),
1931 |offset, (slice, expected_cap)| {
1932 assert_eq!(slice.len(), expected_cap);
1933 match offset {
1934 Some(offset) => {
1935 if offset >= expected_cap {
1936 Some(offset - slice.len())
1937 } else {
1938 assert_eq!(slice[offset], WRITE_BYTE);
1939 None
1940 }
1941 }
1942 None => None,
1943 }
1944 }
1945 ),
1946 None
1947 );
1948 }
1949
1950 #[test]
1951 fn allocate_under_device_minimum() {
1952 const MIN_TX_DATA: usize = 32;
1953 const ALLOC_SIZE: usize = 16;
1954 const WRITE_BYTE: u8 = 0xff;
1955 const WRITE_SENTINAL_BYTE: u8 = 0xee;
1956 const READ_SENTINAL_BYTE: u8 = 0xdd;
1957 let mut config = DEFAULT_CONFIG;
1958 config.buffer_layout.min_tx_data = MIN_TX_DATA;
1959 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1960 for mut buffer in Vec::from_iter(std::iter::from_fn({
1961 let pool = pool.clone();
1962 move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1963 })) {
1964 assert_eq!(
1965 buffer.io_mut().write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]),
1966 MIN_TX_DATA
1967 );
1968 }
1969 let mut allocated =
1970 pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1971 assert_eq!(allocated.len(), MIN_TX_DATA);
1972 const WRITE_BUF_SIZE: usize = MIN_TX_DATA + 1;
1973 assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]), MIN_TX_DATA);
1974 assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; ALLOC_SIZE]), ALLOC_SIZE);
1975 assert_eq!(allocated.len(), MIN_TX_DATA);
1976 const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1977 let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1978 assert_eq!(allocated.io().read_at(0, &mut read_buf[..]), MIN_TX_DATA);
1979 assert_eq!(allocated.io().read_at(0, &mut read_buf[..MIN_TX_DATA]), MIN_TX_DATA);
1980 assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1981 assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[WRITE_BYTE; ALLOC_SIZE][..]);
1982 assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1983 }
1984
1985 #[test]
1986 fn invalid_tx_length() {
1987 let mut config = DEFAULT_CONFIG;
1988 config.buffer_layout.length = usize::from(u16::MAX) + 2;
1989 config.buffer_layout.min_tx_head = 0;
1990 let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1991 assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1992 }
1993
1994 #[test]
1995 fn rx_leases() {
1996 let mut executor = fuchsia_async::TestExecutor::new();
1997 let state = RxLeaseHandlingState::new_with_enabled(true);
1998 let mut watcher = RxLeaseWatcher { state: &state };
1999
2000 {
2001 let mut fut = pin!(watcher.wait_until(0));
2002 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2003 }
2004 {
2005 state.rx_complete();
2006 let mut fut = pin!(watcher.wait_until(1));
2007 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2008 }
2009 {
2010 let mut fut = pin!(watcher.wait_until(0));
2011 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2012 }
2013 {
2014 let mut fut = pin!(watcher.wait_until(3));
2015 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2016 state.rx_complete();
2017 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2018 state.rx_complete();
2019 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2020 }
2021 let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2024 {
2025 let mut fut = pin!(watcher.wait_until(10000));
2026 assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2027 }
2028 let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2029 assert_eq!(counter_before, counter_after);
2030 }
2031}