netdevice_client/session/buffer/
pool.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice buffer pool.
6
7use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::{ManuallyDrop, MaybeUninit};
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::atomic::{self, AtomicBool, AtomicU64};
19use std::sync::Arc;
20use std::task::Poll;
21
22use explicit::ResultExt as _;
23use fidl_fuchsia_hardware_network as netdev;
24use fuchsia_runtime::vmar_root_self;
25use futures::channel::oneshot::{channel, Receiver, Sender};
26
27use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
28use crate::error::{Error, Result};
29use crate::session::{BufferLayout, Config, Pending, Port};
30
31/// Responsible for managing [`Buffer`]s for a [`Session`](crate::session::Session).
32pub(in crate::session) struct Pool {
33    /// Base address of the pool.
34    // Note: This field requires us to manually implement `Sync` and `Send`.
35    base: NonNull<u8>,
36    /// The length of the pool in bytes.
37    bytes: usize,
38    /// The descriptors allocated for the pool.
39    descriptors: Descriptors,
40    /// Shared state for allocation.
41    tx_alloc_state: Mutex<TxAllocState>,
42    /// The free rx descriptors pending to be sent to driver.
43    pub(in crate::session) rx_pending: Pending<Rx>,
44    /// The buffer layout.
45    buffer_layout: BufferLayout,
46    /// State-keeping allowing sessions to handle rx leases.
47    rx_leases: RxLeaseHandlingState,
48}
49
50// `Pool` is `Send` and `Sync`, and this allows the compiler to deduce `Buffer`
51// to be `Send`. These impls are safe because we can safely share `Pool` and
52// `&Pool`: the implementation would never allocate the same buffer to two
53// callers at the same time.
54unsafe impl Send for Pool {}
55unsafe impl Sync for Pool {}
56
57/// The shared state which keeps track of available buffers and tx buffers.
58struct TxAllocState {
59    /// All pending tx allocation requests.
60    requests: VecDeque<TxAllocReq>,
61    free_list: TxFreeList,
62}
63
64/// We use a linked list to maintain the tx free descriptors - they are linked
65/// through their `nxt` fields, note this differs from the chaining expected
66/// by the network device protocol:
67/// - You can chain more than [`netdev::MAX_DESCRIPTOR_CHAIN`] descriptors
68///   together.
69/// - the free-list ends when the `nxt` field is 0xff, while the normal chain
70///   ends when `chain_length` becomes 0.
71struct TxFreeList {
72    /// The head of a linked list of available descriptors that can be allocated
73    /// for tx.
74    head: Option<DescId<Tx>>,
75    /// How many free descriptors are there in the pool.
76    len: u16,
77}
78
79impl Pool {
80    /// Creates a new [`Pool`] and its backing [`zx::Vmo`]s.
81    ///
82    /// Returns [`Pool`] and the [`zx::Vmo`]s for descriptors and data, in that
83    /// order.
84    pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
85        let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
86            config;
87        let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
88        let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
89            Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
90
91        // Construct the free list.
92        let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
93            descriptors.borrow_mut(&mut curr).set_nxt(head);
94            Some(curr)
95        });
96
97        for rx_desc in rx_free.iter_mut() {
98            descriptors.borrow_mut(rx_desc).initialize(
99                ChainLength::ZERO,
100                0,
101                buffer_layout.length.try_into().unwrap(),
102                0,
103            );
104        }
105
106        let tx_alloc_state = TxAllocState {
107            free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
108            requests: VecDeque::new(),
109        };
110
111        let size = buffer_stride.get() * u64::from(num_buffers);
112        let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
113        // `as` is OK because `size` is positive and smaller than isize::MAX.
114        // This is following the practice of rust stdlib to ensure allocation
115        // size never reaches isize::MAX.
116        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
117        let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
118        // The returned address of zx_vmar_map on success must be non-zero:
119        // https://fuchsia.dev/fuchsia-src/reference/syscalls/vmar_map
120        let base = NonNull::new(
121            vmar_root_self()
122                .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
123                .map_err(|status| Error::Map("data", status))? as *mut u8,
124        )
125        .unwrap();
126
127        Ok((
128            Arc::new(Pool {
129                base,
130                bytes: len,
131                descriptors,
132                tx_alloc_state: Mutex::new(tx_alloc_state),
133                rx_pending: Pending::new(rx_free),
134                buffer_layout,
135                rx_leases: RxLeaseHandlingState::new_with_flags(options),
136            }),
137            descriptors_vmo,
138            data_vmo,
139        ))
140    }
141
142    /// Allocates `num_parts` tx descriptors.
143    ///
144    /// It will block if there are not enough descriptors. Note that the
145    /// descriptors are not initialized, you need to call [`AllocGuard::init()`]
146    /// on the returned [`AllocGuard`] if you want to send it to the driver
147    /// later. See [`AllocGuard<Rx>::into_tx()`] for an example where
148    /// [`AllocGuard::init()`] is not needed because the tx allocation will be
149    /// returned to the pool immediately and won't be sent to the driver.
150    pub(in crate::session) async fn alloc_tx(
151        self: &Arc<Self>,
152        num_parts: ChainLength,
153    ) -> AllocGuard<Tx> {
154        let receiver = {
155            let mut state = self.tx_alloc_state.lock();
156            match state.free_list.try_alloc(num_parts, &self.descriptors) {
157                Some(allocated) => {
158                    return AllocGuard::new(allocated, self.clone());
159                }
160                None => {
161                    let (request, receiver) = TxAllocReq::new(num_parts);
162                    state.requests.push_back(request);
163                    receiver
164                }
165            }
166        };
167        // The sender must not be dropped.
168        receiver.await.unwrap()
169    }
170
171    /// Allocates a tx [`Buffer`].
172    ///
173    /// The returned buffer will have `num_bytes` as its capacity, the method
174    /// will block if there are not enough buffers. An error will be returned if
175    /// the requested size cannot meet the device requirement, for example, if
176    /// the size of the head or tail region will become unrepresentable in u16.
177    pub(in crate::session) async fn alloc_tx_buffer(
178        self: &Arc<Self>,
179        num_bytes: usize,
180    ) -> Result<Buffer<Tx>> {
181        self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
182    }
183
184    /// Waits for at least one TX buffer to be available and returns an iterator
185    /// of buffers with `num_bytes` as capacity.
186    ///
187    /// The returned iterator is guaranteed to yield at least one item (though
188    /// it might be an error if the requested size cannot meet the device
189    /// requirement).
190    ///
191    /// # Note
192    ///
193    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
194    /// returned iterator will seemingly yield infinite items if the yielded
195    /// `Buffer`s are dropped while iterating.
196    pub(in crate::session) async fn alloc_tx_buffers<'a>(
197        self: &'a Arc<Self>,
198        num_bytes: usize,
199    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
200        let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
201            self.buffer_layout;
202        let tx_head = usize::from(min_tx_head);
203        let tx_tail = usize::from(min_tx_tail);
204        let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
205        let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
206        let chain_length = ChainLength::try_from(num_parts)?;
207        let first = self.alloc_tx(chain_length).await;
208        let iter = std::iter::once(first)
209            .chain(std::iter::from_fn(move || {
210                let mut state = self.tx_alloc_state.lock();
211                state
212                    .free_list
213                    .try_alloc(chain_length, &self.descriptors)
214                    .map(|allocated| AllocGuard::new(allocated, self.clone()))
215            }))
216            // Fuse afterwards so we're guaranteeing we can't see a new entry
217            // after having yielded `None` once.
218            .fuse()
219            .map(move |mut alloc| {
220                alloc.init(num_bytes)?;
221                Ok(alloc.into())
222            });
223        Ok(iter)
224    }
225
226    /// Frees rx descriptors.
227    pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
228        self.rx_pending.extend(descs.into_iter().map(|mut desc| {
229            self.descriptors.borrow_mut(&mut desc).initialize(
230                ChainLength::ZERO,
231                0,
232                self.buffer_layout.length.try_into().unwrap(),
233                0,
234            );
235            desc
236        }));
237    }
238
239    /// Frees tx descriptors.
240    ///
241    /// # Panics
242    ///
243    /// Panics if given an empty chain.
244    fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
245        let free_impl = |free_list: &mut TxFreeList, chain: Chained<DescId<Tx>>| {
246            let mut descs = chain.into_iter();
247            // The following can't overflow because we can have at most u16::MAX
248            // descriptors: free_len + #(to_free) + #(descs in use) <= u16::MAX,
249            // Thus free_len + #(to_free) <= u16::MAX.
250            free_list.len += u16::try_from(descs.len()).unwrap();
251            let head = descs.next();
252            let old_head = std::mem::replace(&mut free_list.head, head);
253            let mut tail = descs.last();
254            let mut tail_ref = self
255                .descriptors
256                .borrow_mut(tail.as_mut().unwrap_or_else(|| free_list.head.as_mut().unwrap()));
257            tail_ref.set_nxt(old_head);
258        };
259
260        let mut state = self.tx_alloc_state.lock();
261        let TxAllocState { requests, free_list } = &mut *state;
262        let () = free_impl(free_list, chain);
263
264        // After putting the chain back into the free list, we try to fulfill
265        // any pending tx allocation requests.
266        while let Some(req) = requests.front() {
267            match free_list.try_alloc(req.size, &self.descriptors) {
268                Some(descs) => {
269                    // The unwrap is safe because we know requests is not empty.
270                    match requests
271                        .pop_front()
272                        .unwrap()
273                        .sender
274                        .send(AllocGuard::new(descs, self.clone()))
275                        .map_err(ManuallyDrop::new)
276                    {
277                        Ok(()) => {}
278                        Err(mut alloc) => {
279                            let AllocGuard { descs, pool } = alloc.deref_mut();
280                            // We can't run the Drop code for AllocGuard here to
281                            // return the descriptors though, because we are holding
282                            // the lock on the alloc state and the lock is not
283                            // reentrant, so we manually free the descriptors.
284                            let () =
285                                free_impl(free_list, std::mem::replace(descs, Chained::empty()));
286                            // Safety: alloc is wrapped in ManuallyDrop, so alloc.pool
287                            // will not be dropped twice.
288                            let () = unsafe {
289                                std::ptr::drop_in_place(pool);
290                            };
291                        }
292                    }
293                }
294                None => {
295                    if req.sender.is_canceled() {
296                        let _cancelled: Option<TxAllocReq> = requests.pop_front();
297                        continue;
298                    } else {
299                        break;
300                    }
301                }
302            }
303        }
304    }
305
306    /// Frees the completed tx descriptors chained by head to the pool.
307    ///
308    /// Call this function when the driver hands back a completed tx descriptor.
309    pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
310        let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
311        Ok(self.free_tx(chain))
312    }
313
314    /// Creates a [`Buffer<Rx>`] corresponding to the completed rx descriptors.
315    ///
316    /// Whenever the driver hands back a completed rx descriptor, this function
317    /// can be used to create the buffer that is represented by those chained
318    /// descriptors.
319    pub(in crate::session) fn rx_completed(
320        self: &Arc<Self>,
321        head: DescId<Rx>,
322    ) -> Result<Buffer<Rx>> {
323        let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
324        let alloc = AllocGuard::new(descs, self.clone());
325        Ok(alloc.into())
326    }
327}
328
329impl Drop for Pool {
330    fn drop(&mut self) {
331        unsafe {
332            vmar_root_self()
333                .unmap(self.base.as_ptr() as usize, self.bytes)
334                .expect("failed to unmap VMO for Pool")
335        }
336    }
337}
338
339impl TxFreeList {
340    /// Tries to allocate tx descriptors.
341    ///
342    /// Returns [`None`] if there are not enough descriptors.
343    fn try_alloc(
344        &mut self,
345        num_parts: ChainLength,
346        descriptors: &Descriptors,
347    ) -> Option<Chained<DescId<Tx>>> {
348        if u16::from(num_parts.get()) > self.len {
349            return None;
350        }
351
352        let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
353            let new_head = self.head.as_ref().and_then(|head| {
354                let nxt = descriptors.borrow(head).nxt();
355                nxt.map(|id| unsafe {
356                    // Safety: This is the nxt field of head of the free list,
357                    // it must be a tx descriptor id.
358                    DescId::from_raw(id)
359                })
360            });
361            std::mem::replace(&mut self.head, new_head)
362        });
363        let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
364        assert_eq!(allocated.len(), num_parts.into());
365        self.len -= u16::from(num_parts.get());
366        Some(allocated)
367    }
368}
369
370/// The buffer that can be used by the [`Session`](crate::session::Session).
371///
372/// All [`Buffer`]s implement [`std::io::Read`] and [`Buffer<Tx>`]s implement
373/// [`std::io::Write`].
374pub struct Buffer<K: AllocKind> {
375    /// The descriptors allocation.
376    alloc: AllocGuard<K>,
377    /// Underlying memory regions.
378    parts: Chained<BufferPart>,
379    /// The current absolute position to read/write within the [`Buffer`].
380    pos: usize,
381}
382
383impl<K: AllocKind> Debug for Buffer<K> {
384    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
385        let Self { alloc, parts, pos } = self;
386        f.debug_struct("Buffer")
387            .field("cap", &self.cap())
388            .field("alloc", alloc)
389            .field("parts", parts)
390            .field("pos", pos)
391            .finish()
392    }
393}
394
395impl<K: AllocKind> Buffer<K> {
396    /// Gets the capacity of the buffer in bytes as requested for allocation.
397    pub fn cap(&self) -> usize {
398        self.parts.iter().fold(0, |acc, part| acc + part.cap)
399    }
400
401    /// Gets the length of the buffer which is actually used.
402    pub fn len(&self) -> usize {
403        self.parts.iter().fold(0, |acc, part| acc + part.len)
404    }
405
406    /// Writes bytes to the buffer.
407    ///
408    /// Writes up to `src.len()` bytes into the buffer beginning at `offset`,
409    /// returning how many bytes were written successfully. Partial write is
410    /// not considered as an error.
411    pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
412        if self.cap() < offset + src.len() {
413            return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
414        }
415        let mut part_start = 0;
416        let mut total = 0;
417        for part in self.parts.iter_mut() {
418            if offset + total < part_start + part.cap {
419                let written = part.write_at(offset + total - part_start, &src[total..])?;
420                total += written;
421                if total == src.len() {
422                    break;
423                }
424            } else {
425                part.len = part.cap;
426            }
427            part_start += part.cap;
428        }
429        assert_eq!(total, src.len());
430        Ok(())
431    }
432
433    /// Reads bytes from the buffer.
434    ///
435    /// Reads up to `dst.len()` bytes from the buffer beginning at `offset`,
436    /// returning how many bytes were read successfully. Partial read is
437    /// considered as an error.
438    pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
439        if self.len() < offset + dst.len() {
440            return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
441        }
442        let mut part_start = 0;
443        let mut total = 0;
444        for part in self.parts.iter() {
445            if offset + total < part_start + part.cap {
446                let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
447                total += read;
448                if total == dst.len() {
449                    break;
450                }
451            }
452            part_start += part.cap;
453        }
454        assert_eq!(total, dst.len());
455        Ok(())
456    }
457
458    /// Returns this buffer as a mutable slice if it's not fragmented.
459    pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
460        match &mut (*self.parts)[..] {
461            [] => Some(&mut []),
462            [one] => Some(one.as_slice_mut()),
463            _ => None,
464        }
465    }
466
467    /// Pads the [`Buffer`] to minimum tx buffer length requirements.
468    pub(in crate::session) fn pad(&mut self) -> Result<()> {
469        let num_parts = self.parts.len();
470        let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
471            self.alloc.pool.buffer_layout;
472        let mut target = min_tx_data;
473        for (i, part) in self.parts.iter_mut().enumerate() {
474            let grow_cap = if i == num_parts - 1 {
475                let descriptor =
476                    self.alloc.descriptors().last().expect("descriptor must not be empty");
477                let data_length = descriptor.data_length();
478                let tail_length = descriptor.tail_length();
479                // data_length + tail_length <= buffer_length <= usize::MAX.
480                let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
481                match rest.checked_sub(usize::from(min_tx_tail)) {
482                    Some(grow_cap) => Some(grow_cap),
483                    None => break,
484                }
485            } else {
486                None
487            };
488            target -= part.pad(target, grow_cap)?;
489        }
490        if target != 0 {
491            return Err(Error::Pad(min_tx_data, self.cap()));
492        }
493        Ok(())
494    }
495
496    /// Leaks the underlying buffer descriptors to the driver.
497    ///
498    /// Returns the head of the leaked allocation.
499    pub(in crate::session) fn leak(mut self) -> DescId<K> {
500        let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
501        descs.into_iter().next().unwrap()
502    }
503
504    /// Retrieves the frame type of the buffer.
505    pub fn frame_type(&self) -> Result<netdev::FrameType> {
506        self.alloc.descriptor().frame_type()
507    }
508
509    /// Retrieves the buffer's source port.
510    pub fn port(&self) -> Port {
511        self.alloc.descriptor().port()
512    }
513}
514
515impl Buffer<Tx> {
516    /// Commits the metadata for the buffer to descriptors.
517    pub(in crate::session) fn commit(&mut self) {
518        for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
519            // The following unwrap is safe because part.len must be smaller than
520            // buffer_length, which is a u32.
521            descriptor.commit(u32::try_from(part.len).unwrap())
522        }
523    }
524
525    /// Sets the buffer's destination port.
526    pub fn set_port(&mut self, port: Port) {
527        self.alloc.descriptor_mut().set_port(port)
528    }
529
530    /// Sets the frame type of the buffer.
531    pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
532        self.alloc.descriptor_mut().set_frame_type(frame_type)
533    }
534
535    /// Sets TxFlags of a Tx buffer.
536    pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
537        self.alloc.descriptor_mut().set_tx_flags(flags)
538    }
539}
540
541impl Buffer<Rx> {
542    /// Turns an rx buffer into a tx one.
543    pub async fn into_tx(self) -> Buffer<Tx> {
544        let Buffer { alloc, parts, pos } = self;
545        Buffer { alloc: alloc.into_tx().await, parts, pos }
546    }
547
548    /// Retrieves RxFlags of an Rx Buffer.
549    pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
550        self.alloc.descriptor().rx_flags()
551    }
552}
553
554impl AllocGuard<Rx> {
555    /// Turns a tx allocation into an rx one.
556    ///
557    /// To achieve this we have to convert the same amount of descriptors from
558    /// the tx pool to the rx pool to compensate for us being converted to tx
559    /// descriptors from rx ones.
560    async fn into_tx(mut self) -> AllocGuard<Tx> {
561        let mut tx = self.pool.alloc_tx(self.descs.len).await;
562        // [MaybeUninit<DescId<Tx>; 4] and [MaybeUninit<DescId<Rx>; 4] have the
563        // same memory layout because DescId is repr(transparent). So it is safe
564        // to transmute and swap the values between the storages. After the swap
565        // the drop implementation of self will return the descriptors back to
566        // rx pool.
567        std::mem::swap(&mut self.descs.storage, unsafe {
568            std::mem::transmute(&mut tx.descs.storage)
569        });
570        tx
571    }
572}
573
574/// A non-empty container that has at most [`netdev::MAX_DESCRIPTOR_CHAIN`] elements.
575struct Chained<T> {
576    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
577    len: ChainLength,
578}
579
580impl<T> Deref for Chained<T> {
581    type Target = [T];
582
583    fn deref(&self) -> &Self::Target {
584        // Safety: `self.storage[..self.len]` is already initialized.
585        unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
586    }
587}
588
589impl<T> DerefMut for Chained<T> {
590    fn deref_mut(&mut self) -> &mut Self::Target {
591        // Safety: `self.storage[..self.len]` is already initialized.
592        unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
593    }
594}
595
596impl<T> Drop for Chained<T> {
597    fn drop(&mut self) {
598        // Safety: `self.deref_mut()` is a slice of all initialized elements.
599        unsafe {
600            std::ptr::drop_in_place(self.deref_mut());
601        }
602    }
603}
604
605impl<T: Debug> Debug for Chained<T> {
606    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
607        f.debug_list().entries(self.iter()).finish()
608    }
609}
610
611impl<T> Chained<T> {
612    #[allow(clippy::uninit_assumed_init)]
613    fn empty() -> Self {
614        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
615        // safe because the type we are claiming to have initialized here is a
616        // bunch of `MaybeUninit`s, which do not require initialization.
617        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
618        // is stablized.
619        // https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.uninit_array
620        Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
621    }
622}
623
624impl<T> FromIterator<T> for Chained<T> {
625    /// # Panics
626    ///
627    /// if the iterator is empty or the iterator can yield more than
628    ///  MAX_DESCRIPTOR_CHAIN elements.
629    fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
630        let mut result = Self::empty();
631        let mut len = 0u8;
632        for (idx, e) in elements.into_iter().enumerate() {
633            result.storage[idx] = MaybeUninit::new(e);
634            len += 1;
635        }
636        assert!(len > 0);
637        // `len` can not be larger than `MAX_DESCRIPTOR_CHAIN`, otherwise we can't
638        // get here due to the bound checks on `result.storage`.
639        result.len = ChainLength::try_from(len).unwrap();
640        result
641    }
642}
643
644impl<T> IntoIterator for Chained<T> {
645    type Item = T;
646    type IntoIter = ChainedIter<T>;
647
648    fn into_iter(mut self) -> Self::IntoIter {
649        let len = self.len;
650        self.len = ChainLength::ZERO;
651        // Safety: we have reset the length to zero, it is now safe to move out
652        // the values and set them to be uninitialized. The `assume_init` is
653        // safe because the type we are claiming to have initialized here is a
654        // bunch of `MaybeUninit`s, which do not require initialization.
655        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
656        // is stablized.
657        #[allow(clippy::uninit_assumed_init)]
658        let storage =
659            std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
660        ChainedIter { storage, len, consumed: 0 }
661    }
662}
663
664struct ChainedIter<T> {
665    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
666    len: ChainLength,
667    consumed: u8,
668}
669
670impl<T> Iterator for ChainedIter<T> {
671    type Item = T;
672
673    fn next(&mut self) -> Option<Self::Item> {
674        if self.consumed < self.len.get() {
675            // Safety: it is safe now to replace that slot with an uninitialized
676            // value because we will advance consumed by 1.
677            let value = unsafe {
678                std::mem::replace(
679                    &mut self.storage[usize::from(self.consumed)],
680                    MaybeUninit::uninit(),
681                )
682                .assume_init()
683            };
684            self.consumed += 1;
685            Some(value)
686        } else {
687            None
688        }
689    }
690
691    fn size_hint(&self) -> (usize, Option<usize>) {
692        let len = usize::from(self.len.get() - self.consumed);
693        (len, Some(len))
694    }
695}
696
697impl<T> ExactSizeIterator for ChainedIter<T> {}
698
699impl<T> Drop for ChainedIter<T> {
700    fn drop(&mut self) {
701        // Safety: `self.storage[self.consumed..self.len]` is initialized.
702        unsafe {
703            std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
704                &mut self.storage[self.consumed.into()..self.len.into()],
705            ));
706        }
707    }
708}
709
710/// Guards the allocated descriptors; they will be freed when dropped.
711pub(in crate::session) struct AllocGuard<K: AllocKind> {
712    descs: Chained<DescId<K>>,
713    pool: Arc<Pool>,
714}
715
716impl<K: AllocKind> Debug for AllocGuard<K> {
717    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718        let Self { descs, pool: _ } = self;
719        f.debug_struct("AllocGuard").field("descs", descs).finish()
720    }
721}
722
723impl<K: AllocKind> AllocGuard<K> {
724    fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
725        Self { descs, pool }
726    }
727
728    /// Iterates over references to the descriptors.
729    fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
730        self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
731    }
732
733    /// Iterates over mutable references to the descriptors.
734    fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
735        let descriptors = &self.pool.descriptors;
736        self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
737    }
738
739    /// Gets a reference to the head descriptor.
740    fn descriptor(&self) -> DescRef<'_, K> {
741        self.descriptors().next().expect("descriptors must not be empty")
742    }
743
744    /// Gets a mutable reference to the head descriptor.
745    fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
746        self.descriptors_mut().next().expect("descriptors must not be empty")
747    }
748}
749
750impl AllocGuard<Tx> {
751    /// Initializes descriptors of a tx allocation.
752    fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
753        let len = self.len();
754        let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
755            self.pool.buffer_layout;
756        for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
757            let chain_length = ChainLength::try_from(clen).unwrap();
758            let head_length = if clen + 1 == len { min_tx_head } else { 0 };
759            let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
760
761            // head_length and tail_length. The check was done when the config
762            // for pool was created, so the subtraction won't overflow.
763            let available_bytes =
764                u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
765                    .unwrap();
766
767            let data_length = match u32::try_from(requested_bytes) {
768                Ok(requested) => {
769                    if requested < available_bytes {
770                        // The requested bytes are less than what is available,
771                        // we need to put the excess in the tail so that the
772                        // user cannot write more than they requested.
773                        tail_length = u16::try_from(available_bytes - requested)
774                            .ok_checked::<TryFromIntError>()
775                            .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
776                            .ok_or(Error::TxLength)?;
777                    }
778                    requested.min(available_bytes)
779                }
780                Err(TryFromIntError { .. }) => available_bytes,
781            };
782
783            requested_bytes -=
784                usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
785                    panic!(
786                        "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
787                        data_length, requested_bytes
788                    )
789                });
790            descriptor.initialize(chain_length, head_length, data_length, tail_length);
791        }
792        assert_eq!(requested_bytes, 0);
793        Ok(())
794    }
795}
796
797impl<K: AllocKind> Drop for AllocGuard<K> {
798    fn drop(&mut self) {
799        if self.is_empty() {
800            return;
801        }
802        K::free(private::Allocation(self));
803    }
804}
805
806impl<K: AllocKind> Deref for AllocGuard<K> {
807    type Target = [DescId<K>];
808
809    fn deref(&self) -> &Self::Target {
810        self.descs.deref()
811    }
812}
813
814/// A contiguous region of the buffer; corresponding to one descriptor.
815///
816/// [`BufferPart`] owns the memory range [ptr, ptr+cap).
817struct BufferPart {
818    /// The data region starts at `ptr`.
819    ptr: *mut u8,
820    /// The capacity for the region is `cap`.
821    cap: usize,
822    /// Used to indicate how many bytes are actually in the buffer, it
823    /// starts as 0 for a tx buffer and as `cap` for a rx buffer. It will
824    /// be used later as `data_length` in the descriptor.
825    len: usize,
826}
827
828impl BufferPart {
829    /// Creates a new [`BufferPart`] that owns the memory region.
830    ///
831    /// # Safety
832    ///
833    /// The caller must make sure the memory pointed by `ptr` lives longer than
834    /// `BufferPart` being constructed. Once a BufferPart is constructed, it is
835    /// assumed that the memory `[ptr..ptr+cap)` is always valid to read and
836    /// write.
837    unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
838        Self { ptr, cap, len }
839    }
840
841    /// Reads bytes from this buffer part.
842    ///
843    /// Reads up to `dst.len()` bytes from the region beginning at `offset`,
844    /// returning how many bytes were read successfully. Partial read is
845    /// not considered as an error.
846    fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
847        let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
848        let to_copy = std::cmp::min(available, dst.len());
849        // Safety: both source memory region is valid for read the destination
850        // memory region is valid for write.
851        unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
852        Ok(to_copy)
853    }
854
855    /// Writes bytes to this buffer part.
856    ///
857    /// Writes up to `src.len()` bytes into the region beginning at `offset`,
858    /// returning how many bytes were written successfully. Partial write is
859    /// not considered as an error.
860    fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
861        let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
862        let to_copy = std::cmp::min(src.len(), available);
863        // Safety: both source memory region is valid for read the destination
864        // memory region is valid for write.
865        unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
866        self.len = std::cmp::max(self.len, offset + to_copy);
867        Ok(to_copy)
868    }
869
870    /// Pads this part of buffer to have length `target`.
871    ///
872    /// `limit` describes the limit for this region to grow beyond capacity.
873    /// `None` means the part is not allowed to grow and padding must be done
874    /// within the existing capacity, `Some(limit)` means this part is allowed
875    /// to extend its capacity up to the limit.
876    fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
877        if target <= self.len {
878            return Ok(target);
879        }
880        if let Some(limit) = limit {
881            if target > limit {
882                return Err(Error::Pad(target, self.cap));
883            }
884            if self.cap < target {
885                self.cap = target
886            }
887        }
888        let new_len = std::cmp::min(target, self.cap);
889        // Safety: This is safe because the destination memory region is valid
890        // for write.
891        unsafe {
892            std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
893        }
894        self.len = new_len;
895        Ok(new_len)
896    }
897
898    /// Returns the buffer part as a slice.
899    fn as_slice_mut(&mut self) -> &mut [u8] {
900        // SAFETY: BufferPart requires the caller to guarantee the ptr used to
901        // create outlives its instance. BufferPart itself is not copy or clone
902        // so we can rely on unsafety of `new` to uphold this.
903        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
904    }
905}
906
907// `Buffer` needs to be `Send` in order to be useful in async code. Instead
908// of marking `Buffer` as `Send` directly, `BufferPart` is `Send` already
909// and we can let the compiler do the deduction.
910unsafe impl Send for BufferPart {}
911
912impl Debug for BufferPart {
913    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
914        let BufferPart { len, cap, ptr } = &self;
915        f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
916    }
917}
918
919impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
920    fn from(alloc: AllocGuard<K>) -> Self {
921        let AllocGuard { pool, descs: _ } = &alloc;
922        let parts: Chained<BufferPart> = alloc
923            .descriptors()
924            .map(|descriptor| {
925                // The following unwraps are safe because they are already
926                // checked in `DeviceInfo::config`.
927                let offset = usize::try_from(descriptor.offset()).unwrap();
928                let head_length = usize::from(descriptor.head_length());
929                let data_length = usize::try_from(descriptor.data_length()).unwrap();
930                let len = match K::REFL {
931                    AllocKindRefl::Tx => 0,
932                    AllocKindRefl::Rx => data_length,
933                };
934                // Sanity check: make sure the layout is valid.
935                assert!(
936                    offset + head_length <= pool.bytes,
937                    "buffer part starts beyond the end of pool"
938                );
939                assert!(
940                    offset + head_length + data_length <= pool.bytes,
941                    "buffer part ends beyond the end of pool"
942                );
943                // This is safe because the `AllocGuard` makes sure the
944                // underlying memory is valid for the entire time when
945                // `BufferPart` is alive; `add` is safe because
946                // `offset + head_length is within the allocation and
947                // smaller than isize::MAX.
948                unsafe {
949                    BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
950                }
951            })
952            .collect();
953        Self { alloc, parts, pos: 0 }
954    }
955}
956
957impl<K: AllocKind> Read for Buffer<K> {
958    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
959        self.read_at(self.pos, buf)
960            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
961        self.pos += buf.len();
962        Ok(buf.len())
963    }
964}
965
966impl Write for Buffer<Tx> {
967    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
968        self.write_at(self.pos, buf)
969            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
970        self.pos += buf.len();
971        Ok(buf.len())
972    }
973
974    fn flush(&mut self) -> std::io::Result<()> {
975        Ok(())
976    }
977}
978
979impl<K: AllocKind> Seek for Buffer<K> {
980    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
981        let pos = match pos {
982            SeekFrom::Start(pos) => pos,
983            SeekFrom::End(offset) => {
984                let end = i64::try_from(self.cap())
985                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
986                u64::try_from(end.wrapping_add(offset)).unwrap()
987            }
988            SeekFrom::Current(offset) => {
989                let current = i64::try_from(self.pos)
990                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
991                u64::try_from(current.wrapping_add(offset)).unwrap()
992            }
993        };
994        self.pos =
995            usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
996        Ok(pos)
997    }
998}
999
1000/// A pending tx allocation request.
1001struct TxAllocReq {
1002    sender: Sender<AllocGuard<Tx>>,
1003    size: ChainLength,
1004}
1005
1006impl TxAllocReq {
1007    fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1008        let (sender, receiver) = channel();
1009        (TxAllocReq { sender, size }, receiver)
1010    }
1011}
1012
1013/// A module for sealed traits so that the user of this crate can not implement
1014/// [`AllocKind`] for anything than [`Rx`] and [`Tx`].
1015mod private {
1016    use super::{AllocKind, Rx, Tx};
1017    pub trait Sealed: 'static + Sized {}
1018    impl Sealed for Rx {}
1019    impl Sealed for Tx {}
1020
1021    // We can't leak a private type in a public trait, create an opaque private
1022    // new type for &mut super::AllocGuard so that we can mention it in the
1023    // AllocKind trait.
1024    pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1025}
1026
1027/// An allocation can have two kinds, this trait provides a way to project a
1028/// type ([`Rx`] or [`Tx`]) into a value.
1029pub trait AllocKind: private::Sealed {
1030    /// The reflected value of Self.
1031    const REFL: AllocKindRefl;
1032
1033    /// frees an allocation of the given kind.
1034    fn free(alloc: private::Allocation<'_, Self>);
1035}
1036
1037/// A tag to related types for Tx allocations.
1038pub enum Tx {}
1039/// A tag to related types for Rx allocations.
1040pub enum Rx {}
1041
1042/// The reflected value that allows inspection on an [`AllocKind`] type.
1043pub enum AllocKindRefl {
1044    Tx,
1045    Rx,
1046}
1047
1048impl AllocKindRefl {
1049    pub(in crate::session) fn as_str(&self) -> &'static str {
1050        match self {
1051            AllocKindRefl::Tx => "Tx",
1052            AllocKindRefl::Rx => "Rx",
1053        }
1054    }
1055}
1056
1057impl AllocKind for Tx {
1058    const REFL: AllocKindRefl = AllocKindRefl::Tx;
1059
1060    fn free(alloc: private::Allocation<'_, Self>) {
1061        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1062        pool.free_tx(std::mem::replace(descs, Chained::empty()));
1063    }
1064}
1065
1066impl AllocKind for Rx {
1067    const REFL: AllocKindRefl = AllocKindRefl::Rx;
1068
1069    fn free(alloc: private::Allocation<'_, Self>) {
1070        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1071        pool.free_rx(std::mem::replace(descs, Chained::empty()));
1072        pool.rx_leases.rx_complete();
1073    }
1074}
1075
1076/// An extracted struct containing state pertaining to watching rx leases.
1077pub(in crate::session) struct RxLeaseHandlingState {
1078    can_watch_rx_leases: AtomicBool,
1079    /// Keeps a rolling counter of received rx frames MINUS the target frame
1080    /// number of the current outstanding lease.
1081    ///
1082    /// When no leases are pending (via [`RxLeaseWatcher::wait_until`]),
1083    /// then this matches exactly the number of received frames.
1084    ///
1085    /// Otherwise, the lease is currently waiting for remaining `u64::MAX -
1086    /// rx_Frame_counter` frames. The logic depends on `AtomicU64` wrapping
1087    /// around as part of completing rx buffers.
1088    rx_frame_counter: AtomicU64,
1089    rx_lease_waker: AtomicWaker,
1090}
1091
1092impl RxLeaseHandlingState {
1093    fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1094        Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1095    }
1096
1097    fn new_with_enabled(enabled: bool) -> Self {
1098        Self {
1099            can_watch_rx_leases: AtomicBool::new(enabled),
1100            rx_frame_counter: AtomicU64::new(0),
1101            rx_lease_waker: AtomicWaker::new(),
1102        }
1103    }
1104
1105    /// Increments the total receive frame counter and possibly wakes up a
1106    /// waiting lease yielder.
1107    fn rx_complete(&self) {
1108        let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1109        let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1110
1111        // See wait_until for details. We need to hit a waker whenever our add
1112        // wrapped the u64 back around to 0.
1113        if prev == u64::MAX {
1114            rx_lease_waker.wake();
1115        }
1116    }
1117}
1118
1119/// A trait allowing [`RxLeaseWatcher`] to be agnostic over how to get an
1120/// [`RxLeaseHandlingState`].
1121pub(in crate::session) trait RxLeaseHandlingStateContainer {
1122    fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1123}
1124
1125impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1126    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1127        self.borrow()
1128    }
1129}
1130
1131impl RxLeaseHandlingStateContainer for Arc<Pool> {
1132    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1133        &self.rx_leases
1134    }
1135}
1136
1137/// A type safe-wrapper around a single lease watcher per `Pool`.
1138pub(in crate::session) struct RxLeaseWatcher<T> {
1139    state: T,
1140}
1141
1142impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1143    /// Creates a new lease watcher.
1144    ///
1145    /// # Panics
1146    ///
1147    /// Panics if an [`RxLeaseWatcher`] has already been created for the given
1148    /// pool or the pool was not configured for it.
1149    pub(in crate::session) fn new(state: T) -> Self {
1150        assert!(
1151            state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1152            "can't watch rx leases"
1153        );
1154        Self { state }
1155    }
1156
1157    /// Called by sessions to wait until `hold_until_frame` is fulfilled to
1158    /// yield leases out.
1159    ///
1160    /// Blocks until `hold_until_frame`-th rx buffer has been released.
1161    ///
1162    /// Note that this method takes `&mut self` because only one
1163    /// [`RxLeaseWatcher`] may be created by lease handling state, and exclusive
1164    /// access to it is required to watch lease completion.
1165    pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1166        // A note about wrap-arounds.
1167        //
1168        // We're assuming the frame counter will never wrap around for
1169        // correctness here. This should be fine, even assuming a packet
1170        // rate of 1 million pps it'd take almost 600k years for this counter
1171        // to wrap around:
1172        // - 2^64 / 1e6 / 60 / 60 / 24 / 365 ~ 584e3.
1173
1174        let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1175            self.state.lease_handling_state();
1176
1177        let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1178        // After having subtracted the waiting value we *must always restore the
1179        // value* on return, even if the future is not polled to completion.
1180        let _guard = scopeguard::guard((), |()| {
1181            let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1182        });
1183
1184        // Lease is ready to be fulfilled.
1185        if prev >= hold_until_frame {
1186            return;
1187        }
1188        // Threshold is a wrapped around subtraction. So now we must wait
1189        // until the read value from the atomic is LESS THAN the threshold.
1190        let threshold = prev.wrapping_sub(hold_until_frame);
1191        futures::future::poll_fn(|cx| {
1192            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1193            if v < threshold {
1194                return Poll::Ready(());
1195            }
1196            rx_lease_waker.register(cx.waker());
1197            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1198            if v < threshold {
1199                return Poll::Ready(());
1200            }
1201            Poll::Pending
1202        })
1203        .await;
1204    }
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210
1211    use assert_matches::assert_matches;
1212    use fuchsia_async as fasync;
1213    use futures::future::FutureExt;
1214    use test_case::test_case;
1215
1216    use std::collections::HashSet;
1217    use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1218    use std::pin::pin;
1219    use std::task::{Poll, Waker};
1220
1221    const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1222    const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1223    // Safety: These are safe because none of the values are zero.
1224    const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1225    const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1226    const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1227    const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1228        * netdev::MAX_DESCRIPTOR_CHAIN as usize
1229        - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1230        - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1231
1232    const SENTINEL_BYTE: u8 = 0xab;
1233    const WRITE_BYTE: u8 = 1;
1234    const PAD_BYTE: u8 = 0;
1235
1236    const DEFAULT_CONFIG: Config = Config {
1237        buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1238        num_rx_buffers: DEFAULT_RX_BUFFERS,
1239        num_tx_buffers: DEFAULT_TX_BUFFERS,
1240        options: netdev::SessionFlags::empty(),
1241        buffer_layout: BufferLayout {
1242            length: DEFAULT_BUFFER_LENGTH.get(),
1243            min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1244            min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1245            min_tx_data: 0,
1246        },
1247    };
1248
1249    impl Pool {
1250        fn new_test_default() -> Arc<Self> {
1251            let (pool, _descriptors, _data) =
1252                Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1253            pool
1254        }
1255
1256        async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1257            self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1258                .await
1259        }
1260
1261        fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1262            self.alloc_tx_checked(n).now_or_never()
1263        }
1264
1265        fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1266            std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1267        }
1268
1269        fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1270            self.alloc_tx_buffer(num_bytes)
1271                .now_or_never()
1272                .transpose()
1273                .expect("invalid arguments for alloc_tx_buffer")
1274        }
1275
1276        fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1277            Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1278        }
1279
1280        fn fill_sentinel_bytes(&mut self) {
1281            // Safety: We have mut reference to Pool, so we get to modify the
1282            // VMO pointed by self.base.
1283            unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1284        }
1285    }
1286
1287    impl Buffer<Tx> {
1288        // Write a byte at offset, the result buffer should be pad_size long, with
1289        // 0..offset being the SENTINEL_BYTE, offset being the WRITE_BYTE and the
1290        // rest being PAD_BYTE.
1291        fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1292            self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1293            self.pad().expect("failed to pad");
1294            assert_eq!(self.len(), pad_size);
1295            // An arbitrary value that is not SENTINAL/WRITE/PAD_BYTE so that
1296            // we can make sure the write really happened.
1297            const INIT_BYTE: u8 = 42;
1298            let mut read_buf = vec![INIT_BYTE; pad_size];
1299            self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1300            for (idx, byte) in read_buf.iter().enumerate() {
1301                if idx < offset {
1302                    assert_eq!(*byte, SENTINEL_BYTE);
1303                } else if idx == offset {
1304                    assert_eq!(*byte, WRITE_BYTE);
1305                } else {
1306                    assert_eq!(*byte, PAD_BYTE);
1307                }
1308            }
1309        }
1310    }
1311
1312    impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1313    where
1314        K: AllocKind,
1315        I: ExactSizeIterator<Item = u16>,
1316        T: Copy + IntoIterator<IntoIter = I>,
1317    {
1318        fn eq(&self, other: &T) -> bool {
1319            let iter = other.into_iter();
1320            if usize::from(self.len) != iter.len() {
1321                return false;
1322            }
1323            self.iter().zip(iter).all(|(l, r)| l.get() == r)
1324        }
1325    }
1326
1327    impl Debug for TxAllocReq {
1328        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1329            let TxAllocReq { sender: _, size } = self;
1330            f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1331        }
1332    }
1333
1334    #[test]
1335    fn alloc_tx_distinct() {
1336        let pool = Pool::new_test_default();
1337        let allocated = pool.alloc_tx_all(1);
1338        assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1339        let distinct = allocated
1340            .iter()
1341            .map(|alloc| {
1342                assert_eq!(alloc.descs.len(), 1);
1343                alloc.descs[0].get()
1344            })
1345            .collect::<HashSet<u16>>();
1346        assert_eq!(allocated.len(), distinct.len());
1347    }
1348
1349    #[test]
1350    fn alloc_tx_free_len() {
1351        let pool = Pool::new_test_default();
1352        {
1353            let allocated = pool.alloc_tx_all(2);
1354            assert_eq!(
1355                allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1356                DEFAULT_TX_BUFFERS.get().into()
1357            );
1358            assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1359        }
1360        assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1361    }
1362
1363    #[test]
1364    fn alloc_tx_chain() {
1365        let pool = Pool::new_test_default();
1366        let allocated = pool.alloc_tx_all(3);
1367        assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1368        assert_matches!(pool.alloc_tx_now_or_never(3), None);
1369        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1370    }
1371
1372    #[test]
1373    fn alloc_tx_many() {
1374        let pool = Pool::new_test_default();
1375        let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1376            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1377            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1378        let data_len = usize::try_from(data_len).unwrap();
1379        let mut buffers = pool
1380            .alloc_tx_buffers(data_len)
1381            .now_or_never()
1382            .expect("failed to alloc")
1383            .unwrap()
1384            // Collect into a vec so we keep the buffers alive, otherwise they
1385            // are immediately returned to the pool.
1386            .collect::<Result<Vec<_>>>()
1387            .expect("buffer error");
1388        assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1389
1390        // We have all the buffers, which means allocating more should not
1391        // resolve.
1392        assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1393
1394        // If we release a single buffer we should be able to retrieve it again.
1395        assert_matches!(buffers.pop(), Some(_));
1396        let mut more_buffers =
1397            pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1398        let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1399        assert_matches!(more_buffers.next(), None);
1400        // The iterator is fused, so None is yielded even after dropping the
1401        // buffer.
1402        drop(buffer);
1403        assert_matches!(more_buffers.next(), None);
1404    }
1405
1406    #[test]
1407    fn alloc_tx_after_free() {
1408        let pool = Pool::new_test_default();
1409        let mut allocated = pool.alloc_tx_all(1);
1410        assert_matches!(pool.alloc_tx_now_or_never(2), None);
1411        {
1412            let _drained = allocated.drain(..2);
1413        }
1414        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1415    }
1416
1417    #[test]
1418    fn blocking_alloc_tx() {
1419        let mut executor = fasync::TestExecutor::new();
1420        let pool = Pool::new_test_default();
1421        let mut allocated = pool.alloc_tx_all(1);
1422        let alloc_fut = pool.alloc_tx_checked(1);
1423        let mut alloc_fut = pin!(alloc_fut);
1424        // The allocation should block.
1425        assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1426        // And the allocation request should be queued.
1427        assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1428        let freed = allocated
1429            .pop()
1430            .expect("no fulfulled allocations")
1431            .iter()
1432            .map(|x| x.get())
1433            .collect::<Chained<_>>();
1434        let same_as_freed =
1435            |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1436        // Now the task should be able to continue.
1437        assert_matches!(
1438            &executor.run_until_stalled(&mut alloc_fut),
1439            Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1440        );
1441        // And the queued request should now be removed.
1442        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1443    }
1444
1445    #[test]
1446    fn blocking_alloc_tx_cancel_before_free() {
1447        let mut executor = fasync::TestExecutor::new();
1448        let pool = Pool::new_test_default();
1449        let mut allocated = pool.alloc_tx_all(1);
1450        {
1451            let alloc_fut = pool.alloc_tx_checked(1);
1452            let mut alloc_fut = pin!(alloc_fut);
1453            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1454            assert_matches!(
1455                pool.tx_alloc_state.lock().requests.as_slices(),
1456                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1457            );
1458        }
1459        assert_matches!(
1460            allocated.pop(),
1461            Some(AllocGuard { ref descs, pool: ref p })
1462                if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1463        );
1464        let state = pool.tx_alloc_state.lock();
1465        assert_eq!(state.free_list.len, 1);
1466        assert!(state.requests.is_empty());
1467    }
1468
1469    #[test]
1470    fn blocking_alloc_tx_cancel_after_free() {
1471        let mut executor = fasync::TestExecutor::new();
1472        let pool = Pool::new_test_default();
1473        let mut allocated = pool.alloc_tx_all(1);
1474        {
1475            let alloc_fut = pool.alloc_tx_checked(1);
1476            let mut alloc_fut = pin!(alloc_fut);
1477            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1478            assert_matches!(
1479                pool.tx_alloc_state.lock().requests.as_slices(),
1480                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1481            );
1482            assert_matches!(
1483                allocated.pop(),
1484                Some(AllocGuard { ref descs, pool: ref p })
1485                    if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1486            );
1487        }
1488        let state = pool.tx_alloc_state.lock();
1489        assert_eq!(state.free_list.len, 1);
1490        assert!(state.requests.is_empty());
1491    }
1492
1493    #[test]
1494    fn multiple_blocking_alloc_tx_fulfill_order() {
1495        const TASKS_TOTAL: usize = 3;
1496        let mut executor = fasync::TestExecutor::new();
1497        let pool = Pool::new_test_default();
1498        let mut allocated = pool.alloc_tx_all(1);
1499        let mut alloc_futs = (1..=TASKS_TOTAL)
1500            .rev()
1501            .map(|x| {
1502                let pool = pool.clone();
1503                (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1504            })
1505            .collect::<Vec<_>>();
1506
1507        for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1508            assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1509            // assert that the tasks are sorted decreasing on the requested size.
1510            assert_eq!(idx + *req_size, TASKS_TOTAL);
1511        }
1512        {
1513            let state = pool.tx_alloc_state.lock();
1514            // The first pending request was introduced by `alloc_tx_all`.
1515            assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1516            let mut requests = state.requests.iter();
1517            // It should already be cancelled because the requesting future is
1518            // already dropped.
1519            assert!(requests.next().unwrap().sender.is_canceled());
1520            // The rest of the requests must not be cancelled.
1521            assert!(requests.all(|req| !req.sender.is_canceled()))
1522        }
1523
1524        let mut to_free = Vec::new();
1525        let mut freed = 0;
1526        for free_size in (1..=TASKS_TOTAL).rev() {
1527            let (_req_size, mut task) = alloc_futs.remove(0);
1528            for _ in 1..free_size {
1529                freed += 1;
1530                assert_matches!(
1531                    allocated.pop(),
1532                    Some(AllocGuard { ref descs, pool: ref p })
1533                        if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1534                );
1535                assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1536            }
1537            freed += 1;
1538            assert_matches!(
1539                allocated.pop(),
1540                Some(AllocGuard { ref descs, pool: ref p })
1541                    if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1542            );
1543            match executor.run_until_stalled(&mut task) {
1544                Poll::Ready(alloc) => {
1545                    assert_eq!(alloc.len(), free_size);
1546                    // Don't return the allocation to the pool now.
1547                    to_free.push(alloc);
1548                }
1549                Poll::Pending => panic!("The request should be fulfilled"),
1550            }
1551            // The rest of requests can not be fulfilled.
1552            for (_req_size, task) in alloc_futs.iter_mut() {
1553                assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1554            }
1555        }
1556        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1557    }
1558
1559    #[test]
1560    fn singleton_tx_layout() {
1561        let pool = Pool::new_test_default();
1562        let buffers = std::iter::from_fn(|| {
1563            let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1564                - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1565                - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1566            pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1567                assert_eq!(buffer.alloc.descriptors().count(), 1);
1568                let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1569                    * u64::from(buffer.alloc[0].get());
1570                {
1571                    let descriptor = buffer.alloc.descriptor();
1572                    assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1573                    assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1574                    assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1575                    assert_eq!(descriptor.data_length(), data_len);
1576                    assert_eq!(descriptor.offset(), offset);
1577                }
1578
1579                assert_eq!(buffer.parts.len(), 1);
1580                let BufferPart { ptr, len, cap } = buffer.parts[0];
1581                assert_eq!(len, 0);
1582                assert_eq!(
1583                    // Using wrapping_add because we will never dereference the
1584                    // resulting pointer and it saves us an unsafe block.
1585                    pool.base.as_ptr().wrapping_add(
1586                        usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1587                    ),
1588                    ptr
1589                );
1590                assert_eq!(data_len, u32::try_from(cap).unwrap());
1591                buffer
1592            })
1593        })
1594        .collect::<Vec<_>>();
1595        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1596    }
1597
1598    #[test]
1599    fn chained_tx_layout() {
1600        let pool = Pool::new_test_default();
1601        let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1602            - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1603            - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1604        let buffers = std::iter::from_fn(|| {
1605            pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1606                assert_eq!(buffer.parts.len(), 4);
1607                for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1608                    let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1609                    let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1610                    let tail_length = if chain_length == ChainLength::ZERO {
1611                        DEFAULT_MIN_TX_BUFFER_TAIL
1612                    } else {
1613                        0
1614                    };
1615                    let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1616                        - u32::from(head_length)
1617                        - u32::from(tail_length);
1618                    let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1619                        * u64::from(buffer.alloc[idx].get());
1620                    assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1621                    assert_eq!(descriptor.head_length(), head_length);
1622                    assert_eq!(descriptor.tail_length(), tail_length);
1623                    assert_eq!(descriptor.offset(), offset);
1624                    assert_eq!(descriptor.data_length(), data_len);
1625                    if chain_length != ChainLength::ZERO {
1626                        assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1627                    }
1628
1629                    let BufferPart { ptr, cap, len } = buffer.parts[idx];
1630                    assert_eq!(len, 0);
1631                    assert_eq!(
1632                        // Using wrapping_add because we will never dereference
1633                        // the resulting ptr and it saves us an unsafe block.
1634                        pool.base.as_ptr().wrapping_add(
1635                            usize::try_from(offset).unwrap() + usize::from(head_length),
1636                        ),
1637                        ptr
1638                    );
1639                    assert_eq!(data_len, u32::try_from(cap).unwrap());
1640                }
1641                buffer
1642            })
1643        })
1644        .collect::<Vec<_>>();
1645        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1646    }
1647
1648    #[test]
1649    fn rx_distinct() {
1650        let pool = Pool::new_test_default();
1651        let mut guard = pool.rx_pending.inner.lock();
1652        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1653        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1654        let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1655        assert_eq!(descs.len(), distinct.len());
1656    }
1657
1658    #[test]
1659    fn alloc_rx_layout() {
1660        let pool = Pool::new_test_default();
1661        let mut guard = pool.rx_pending.inner.lock();
1662        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1663        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1664        for desc in descs.iter() {
1665            let descriptor = pool.descriptors.borrow(desc);
1666            let offset =
1667                u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1668            assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1669            assert_eq!(descriptor.head_length(), 0);
1670            assert_eq!(descriptor.tail_length(), 0);
1671            assert_eq!(descriptor.offset(), offset);
1672            assert_eq!(
1673                descriptor.data_length(),
1674                u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1675            );
1676        }
1677    }
1678
1679    #[test]
1680    fn buffer_read_at_write_at() {
1681        let pool = Pool::new_test_default();
1682        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1683        let mut buffer =
1684            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1685        // Because we have to accommodate the space for head and tail, there
1686        // would be 2 parts instead of 1.
1687        assert_eq!(buffer.parts.len(), 2);
1688        assert_eq!(buffer.cap(), alloc_bytes);
1689        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1690        buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1691        let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1692        buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1693        for (idx, byte) in read_buf.iter().enumerate() {
1694            assert_eq!(*byte, write_buf[idx]);
1695        }
1696    }
1697
1698    #[test]
1699    fn buffer_read_write_seek() {
1700        let pool = Pool::new_test_default();
1701        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1702        let mut buffer =
1703            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1704        // Because we have to accommodate the space for head and tail, there
1705        // would be 2 parts instead of 1.
1706        assert_eq!(buffer.parts.len(), 2);
1707        assert_eq!(buffer.cap(), alloc_bytes);
1708        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1709        assert_eq!(
1710            buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1711            write_buf.len()
1712        );
1713        const SEEK_FROM_END: usize = 64;
1714        const READ_LEN: usize = 12;
1715        assert_eq!(
1716            buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1717            u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1718        );
1719        let mut read_buf = [0xff; READ_LEN];
1720        assert_eq!(
1721            buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1722            read_buf.len()
1723        );
1724        assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1725    }
1726
1727    #[test_case(32; "single buffer part")]
1728    #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1729    fn buffer_pad(pad_size: usize) {
1730        let mut pool = Pool::new_test_default();
1731        pool.set_min_tx_buffer_length(pad_size);
1732        for offset in 0..pad_size {
1733            Arc::get_mut(&mut pool)
1734                .expect("there are multiple owners of the underlying VMO")
1735                .fill_sentinel_bytes();
1736            let mut buffer =
1737                pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1738            buffer.check_write_and_pad(offset, pad_size);
1739        }
1740    }
1741
1742    #[test]
1743    fn buffer_pad_grow() {
1744        const BUFFER_PARTS: u8 = 3;
1745        let mut pool = Pool::new_test_default();
1746        let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1747            * u32::from(BUFFER_PARTS)
1748            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1749            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1750        pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1751        for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1752            Arc::get_mut(&mut pool)
1753                .expect("there are multiple owners of the underlying VMO")
1754                .fill_sentinel_bytes();
1755            let mut alloc =
1756                pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1757            alloc
1758                .init(
1759                    DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1760                        - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1761                        - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1762                )
1763                .expect("head/body/tail sizes are representable with u16/u32/u16");
1764            let mut buffer = Buffer::try_from(alloc).unwrap();
1765            buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1766        }
1767    }
1768
1769    #[test_case(  0; "writes at the beginning")]
1770    #[test_case( 15; "writes in the first part")]
1771    #[test_case( 75; "writes in the second part")]
1772    #[test_case(135; "writes in the third part")]
1773    #[test_case(195; "writes in the last part")]
1774    fn buffer_used(write_offset: usize) {
1775        let pool = Pool::new_test_default();
1776        let mut buffer =
1777            pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1778        let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1779            if i == 0 {
1780                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1781            } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1782                DEFAULT_BUFFER_LENGTH.get()
1783            } else {
1784                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1785            }
1786        });
1787        assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1788        buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1789        // The accumulator is Some if we haven't found the part where the byte
1790        // was written, None if we've already found it.
1791        assert_eq!(
1792            buffer.parts.iter().zip(expected_caps).fold(
1793                Some(write_offset),
1794                |offset, (part, expected_cap)| {
1795                    // The cap must match the expectation.
1796                    assert_eq!(part.cap, expected_cap);
1797
1798                    match offset {
1799                        Some(offset) => {
1800                            if offset >= expected_cap {
1801                                // The part should have used all the capacity.
1802                                assert_eq!(part.len, part.cap);
1803                                Some(offset - part.len)
1804                            } else {
1805                                // The part should end right after our byte.
1806                                assert_eq!(part.len, offset + 1);
1807                                let mut buf = [0];
1808                                // Verify that the byte is indeed written.
1809                                assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1810                                assert_eq!(buf[0], WRITE_BYTE);
1811                                None
1812                            }
1813                        }
1814                        None => {
1815                            // We should have never written in this part.
1816                            assert_eq!(part.len, 0);
1817                            None
1818                        }
1819                    }
1820                }
1821            ),
1822            None
1823        )
1824    }
1825
1826    #[test]
1827    fn buffer_commit() {
1828        let pool = Pool::new_test_default();
1829        for offset in 0..MAX_BUFFER_BYTES {
1830            let mut buffer = pool
1831                .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1832                .expect("failed to allocate buffer");
1833            buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1834            buffer.commit();
1835            for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1836                let head_length = descriptor.head_length();
1837                let tail_length = descriptor.tail_length();
1838                let data_length = descriptor.data_length();
1839                assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1840                assert_eq!(
1841                    u32::from(head_length + tail_length) + data_length,
1842                    u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1843                );
1844            }
1845        }
1846    }
1847
1848    #[test]
1849    fn allocate_under_device_minimum() {
1850        const MIN_TX_DATA: usize = 32;
1851        const ALLOC_SIZE: usize = 16;
1852        const WRITE_BYTE: u8 = 0xff;
1853        const WRITE_SENTINAL_BYTE: u8 = 0xee;
1854        const READ_SENTINAL_BYTE: u8 = 0xdd;
1855        let mut config = DEFAULT_CONFIG;
1856        config.buffer_layout.min_tx_data = MIN_TX_DATA;
1857        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1858        for mut buffer in Vec::from_iter(std::iter::from_fn({
1859            let pool = pool.clone();
1860            move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1861        })) {
1862            buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1863        }
1864        let mut allocated =
1865            pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1866        assert_eq!(allocated.cap(), ALLOC_SIZE);
1867        const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1868        assert_matches!(
1869            allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1870            Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1871        );
1872        allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1873        assert_matches!(allocated.pad(), Ok(()));
1874        assert_eq!(allocated.cap(), MIN_TX_DATA);
1875        assert_eq!(allocated.len(), MIN_TX_DATA);
1876        const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1877        let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1878        assert_matches!(
1879            allocated.read_at(0, &mut read_buf[..]),
1880            Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1881        );
1882        allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1883        assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1884        assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1885        assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1886    }
1887
1888    #[test]
1889    fn invalid_tx_length() {
1890        let mut config = DEFAULT_CONFIG;
1891        config.buffer_layout.length = usize::from(u16::MAX) + 2;
1892        config.buffer_layout.min_tx_head = 0;
1893        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1894        assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1895    }
1896
1897    #[test]
1898    fn rx_leases() {
1899        let mut executor = fuchsia_async::TestExecutor::new();
1900        let state = RxLeaseHandlingState::new_with_enabled(true);
1901        let mut watcher = RxLeaseWatcher { state: &state };
1902
1903        {
1904            let mut fut = pin!(watcher.wait_until(0));
1905            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1906        }
1907        {
1908            state.rx_complete();
1909            let mut fut = pin!(watcher.wait_until(1));
1910            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1911        }
1912        {
1913            let mut fut = pin!(watcher.wait_until(0));
1914            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1915        }
1916        {
1917            let mut fut = pin!(watcher.wait_until(3));
1918            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1919            state.rx_complete();
1920            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1921            state.rx_complete();
1922            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1923        }
1924        // Dropping the wait future without seeing it complete restores the
1925        // value.
1926        let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1927        {
1928            let mut fut = pin!(watcher.wait_until(10000));
1929            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1930        }
1931        let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1932        assert_eq!(counter_before, counter_after);
1933    }
1934}