netdevice_client/session/buffer/
pool.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice buffer pool.
6
7use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27use zx::AsHandleRef as _;
28
29use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
30use crate::error::{Error, Result};
31use crate::session::{BufferLayout, Config, Pending, Port};
32
33/// Responsible for managing [`Buffer`]s for a [`Session`](crate::session::Session).
34pub(in crate::session) struct Pool {
35    /// Base address of the pool.
36    // Note: This field requires us to manually implement `Sync` and `Send`.
37    base: NonNull<u8>,
38    /// The length of the pool in bytes.
39    bytes: usize,
40    /// The descriptors allocated for the pool.
41    descriptors: Descriptors,
42    /// Shared state for allocation.
43    tx_alloc_state: Mutex<TxAllocState>,
44    /// The free rx descriptors pending to be sent to driver.
45    pub(in crate::session) rx_pending: Pending<Rx>,
46    /// The buffer layout.
47    buffer_layout: BufferLayout,
48    /// State-keeping allowing sessions to handle rx leases.
49    rx_leases: RxLeaseHandlingState,
50}
51
52// `Pool` is `Send` and `Sync`, and this allows the compiler to deduce `Buffer`
53// to be `Send`. These impls are safe because we can safely share `Pool` and
54// `&Pool`: the implementation would never allocate the same buffer to two
55// callers at the same time.
56unsafe impl Send for Pool {}
57unsafe impl Sync for Pool {}
58
59/// The shared state which keeps track of available buffers and tx buffers.
60struct TxAllocState {
61    /// All pending tx allocation requests.
62    requests: VecDeque<TxAllocReq>,
63    free_list: TxFreeList,
64}
65
66/// We use a linked list to maintain the tx free descriptors - they are linked
67/// through their `nxt` fields, note this differs from the chaining expected
68/// by the network device protocol:
69/// - You can chain more than [`netdev::MAX_DESCRIPTOR_CHAIN`] descriptors
70///   together.
71/// - the free-list ends when the `nxt` field is 0xff, while the normal chain
72///   ends when `chain_length` becomes 0.
73struct TxFreeList {
74    /// The head of a linked list of available descriptors that can be allocated
75    /// for tx.
76    head: Option<DescId<Tx>>,
77    /// How many free descriptors are there in the pool.
78    len: u16,
79}
80
81impl Pool {
82    /// Creates a new [`Pool`] and its backing [`zx::Vmo`]s.
83    ///
84    /// Returns [`Pool`] and the [`zx::Vmo`]s for descriptors and data, in that
85    /// order.
86    pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
87        let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
88            config;
89        let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
90        let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
91            Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
92
93        // Construct the free list.
94        let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
95            descriptors.borrow_mut(&mut curr).set_nxt(head);
96            Some(curr)
97        });
98
99        for rx_desc in rx_free.iter_mut() {
100            descriptors.borrow_mut(rx_desc).initialize(
101                ChainLength::ZERO,
102                0,
103                buffer_layout.length.try_into().unwrap(),
104                0,
105            );
106        }
107
108        let tx_alloc_state = TxAllocState {
109            free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
110            requests: VecDeque::new(),
111        };
112
113        let size = buffer_stride.get() * u64::from(num_buffers);
114        let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
115
116        const VMO_NAME: zx::Name =
117            const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
118        data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
119        // `as` is OK because `size` is positive and smaller than isize::MAX.
120        // This is following the practice of rust stdlib to ensure allocation
121        // size never reaches isize::MAX.
122        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
123        let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
124        // The returned address of zx_vmar_map on success must be non-zero:
125        // https://fuchsia.dev/fuchsia-src/reference/syscalls/vmar_map
126        let base = NonNull::new(
127            vmar_root_self()
128                .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
129                .map_err(|status| Error::Map("data", status))? as *mut u8,
130        )
131        .unwrap();
132
133        Ok((
134            Arc::new(Pool {
135                base,
136                bytes: len,
137                descriptors,
138                tx_alloc_state: Mutex::new(tx_alloc_state),
139                rx_pending: Pending::new(rx_free),
140                buffer_layout,
141                rx_leases: RxLeaseHandlingState::new_with_flags(options),
142            }),
143            descriptors_vmo,
144            data_vmo,
145        ))
146    }
147
148    /// Allocates `num_parts` tx descriptors.
149    ///
150    /// It will block if there are not enough descriptors. Note that the
151    /// descriptors are not initialized, you need to call [`AllocGuard::init()`]
152    /// on the returned [`AllocGuard`] if you want to send it to the driver
153    /// later. See [`AllocGuard<Rx>::into_tx()`] for an example where
154    /// [`AllocGuard::init()`] is not needed because the tx allocation will be
155    /// returned to the pool immediately and won't be sent to the driver.
156    pub(in crate::session) async fn alloc_tx(
157        self: &Arc<Self>,
158        num_parts: ChainLength,
159    ) -> AllocGuard<Tx> {
160        let receiver = {
161            let mut state = self.tx_alloc_state.lock();
162            match state.free_list.try_alloc(num_parts, &self.descriptors) {
163                Some(allocated) => {
164                    return AllocGuard::new(allocated, self.clone());
165                }
166                None => {
167                    let (request, receiver) = TxAllocReq::new(num_parts);
168                    state.requests.push_back(request);
169                    receiver
170                }
171            }
172        };
173        // The sender must not be dropped.
174        receiver.await.unwrap()
175    }
176
177    /// Allocates a tx [`Buffer`].
178    ///
179    /// The returned buffer will have `num_bytes` as its capacity, the method
180    /// will block if there are not enough buffers. An error will be returned if
181    /// the requested size cannot meet the device requirement, for example, if
182    /// the size of the head or tail region will become unrepresentable in u16.
183    pub(in crate::session) async fn alloc_tx_buffer(
184        self: &Arc<Self>,
185        num_bytes: usize,
186    ) -> Result<Buffer<Tx>> {
187        self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
188    }
189
190    /// Waits for at least one TX buffer to be available and returns an iterator
191    /// of buffers with `num_bytes` as capacity.
192    ///
193    /// The returned iterator is guaranteed to yield at least one item (though
194    /// it might be an error if the requested size cannot meet the device
195    /// requirement).
196    ///
197    /// # Note
198    ///
199    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
200    /// returned iterator will seemingly yield infinite items if the yielded
201    /// `Buffer`s are dropped while iterating.
202    pub(in crate::session) async fn alloc_tx_buffers<'a>(
203        self: &'a Arc<Self>,
204        num_bytes: usize,
205    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
206        let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
207            self.buffer_layout;
208        let tx_head = usize::from(min_tx_head);
209        let tx_tail = usize::from(min_tx_tail);
210        let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
211        let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
212        let chain_length = ChainLength::try_from(num_parts)?;
213        let first = self.alloc_tx(chain_length).await;
214        let iter = std::iter::once(first)
215            .chain(std::iter::from_fn(move || {
216                let mut state = self.tx_alloc_state.lock();
217                state
218                    .free_list
219                    .try_alloc(chain_length, &self.descriptors)
220                    .map(|allocated| AllocGuard::new(allocated, self.clone()))
221            }))
222            // Fuse afterwards so we're guaranteeing we can't see a new entry
223            // after having yielded `None` once.
224            .fuse()
225            .map(move |mut alloc| {
226                alloc.init(num_bytes)?;
227                Ok(alloc.into())
228            });
229        Ok(iter)
230    }
231
232    /// Frees rx descriptors.
233    pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
234        self.rx_pending.extend(descs.into_iter().map(|mut desc| {
235            self.descriptors.borrow_mut(&mut desc).initialize(
236                ChainLength::ZERO,
237                0,
238                self.buffer_layout.length.try_into().unwrap(),
239                0,
240            );
241            desc
242        }));
243    }
244
245    /// Frees tx descriptors.
246    ///
247    /// # Panics
248    ///
249    /// Panics if given an empty chain.
250    fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
251        // We store any pending request that need to be fulfilled in the stack
252        // here, to fulfill them only once we drop the lock, guaranteeing an
253        // AllocGuard can't be dropped while the lock is held.
254        let mut to_fulfill = ArrayVec::<
255            (TxAllocReq, AllocGuard<Tx>),
256            { netdev::MAX_DESCRIPTOR_CHAIN as usize },
257        >::new();
258
259        let mut state = self.tx_alloc_state.lock();
260
261        {
262            let mut descs = chain.into_iter();
263            // The following can't overflow because we can have at most u16::MAX
264            // descriptors: free_len + #(to_free) + #(descs in use) <= u16::MAX,
265            // Thus free_len + #(to_free) <= u16::MAX.
266            state.free_list.len += u16::try_from(descs.len()).unwrap();
267            let head = descs.next();
268            let old_head = std::mem::replace(&mut state.free_list.head, head);
269            let mut tail = descs.last();
270            let mut tail_ref = self.descriptors.borrow_mut(
271                tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
272            );
273            tail_ref.set_nxt(old_head);
274        }
275
276        // After putting the chain back into the free list, we try to fulfill
277        // any pending tx allocation requests.
278        while let Some(req) = state.requests.front() {
279            // Skip a request that we know is canceled.
280            //
281            // This is an optimization for long-ago dropped requests, since the
282            // receiver side can be dropped between here and fulfillment later.
283            if req.sender.is_canceled() {
284                let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
285                continue;
286            }
287            let size = req.size;
288            match state.free_list.try_alloc(size, &self.descriptors) {
289                Some(descs) => {
290                    // The unwrap is safe because we know requests is not empty.
291                    let req = state.requests.pop_front().unwrap();
292                    to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
293
294                    // If we're full temporarily release the lock to go again
295                    // later. Fulfilling a request must _always_ be done without
296                    // holding the lock.
297                    if to_fulfill.is_full() {
298                        drop(state);
299                        for (req, alloc) in to_fulfill.drain(..) {
300                            req.fulfill(alloc)
301                        }
302                        state = self.tx_alloc_state.lock();
303                    }
304                }
305                None => break,
306            }
307        }
308
309        // Make sure we're not holding the state lock when fulfilling requests.
310        drop(state);
311        // Fulfill any ready requests.
312        for (req, alloc) in to_fulfill {
313            req.fulfill(alloc)
314        }
315    }
316
317    /// Frees the completed tx descriptors chained by head to the pool.
318    ///
319    /// Call this function when the driver hands back a completed tx descriptor.
320    pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
321        let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
322        Ok(self.free_tx(chain))
323    }
324
325    /// Creates a [`Buffer<Rx>`] corresponding to the completed rx descriptors.
326    ///
327    /// Whenever the driver hands back a completed rx descriptor, this function
328    /// can be used to create the buffer that is represented by those chained
329    /// descriptors.
330    pub(in crate::session) fn rx_completed(
331        self: &Arc<Self>,
332        head: DescId<Rx>,
333    ) -> Result<Buffer<Rx>> {
334        let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
335        let alloc = AllocGuard::new(descs, self.clone());
336        Ok(alloc.into())
337    }
338}
339
340impl Drop for Pool {
341    fn drop(&mut self) {
342        unsafe {
343            vmar_root_self()
344                .unmap(self.base.as_ptr() as usize, self.bytes)
345                .expect("failed to unmap VMO for Pool")
346        }
347    }
348}
349
350impl TxFreeList {
351    /// Tries to allocate tx descriptors.
352    ///
353    /// Returns [`None`] if there are not enough descriptors.
354    fn try_alloc(
355        &mut self,
356        num_parts: ChainLength,
357        descriptors: &Descriptors,
358    ) -> Option<Chained<DescId<Tx>>> {
359        if u16::from(num_parts.get()) > self.len {
360            return None;
361        }
362
363        let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
364            let new_head = self.head.as_ref().and_then(|head| {
365                let nxt = descriptors.borrow(head).nxt();
366                nxt.map(|id| unsafe {
367                    // Safety: This is the nxt field of head of the free list,
368                    // it must be a tx descriptor id.
369                    DescId::from_raw(id)
370                })
371            });
372            std::mem::replace(&mut self.head, new_head)
373        });
374        let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
375        assert_eq!(allocated.len(), num_parts.into());
376        self.len -= u16::from(num_parts.get());
377        Some(allocated)
378    }
379}
380
381/// The buffer that can be used by the [`Session`](crate::session::Session).
382///
383/// All [`Buffer`]s implement [`std::io::Read`] and [`Buffer<Tx>`]s implement
384/// [`std::io::Write`].
385pub struct Buffer<K: AllocKind> {
386    /// The descriptors allocation.
387    alloc: AllocGuard<K>,
388    /// Underlying memory regions.
389    parts: Chained<BufferPart>,
390    /// The current absolute position to read/write within the [`Buffer`].
391    pos: usize,
392}
393
394impl<K: AllocKind> Debug for Buffer<K> {
395    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
396        let Self { alloc, parts, pos } = self;
397        f.debug_struct("Buffer")
398            .field("cap", &self.cap())
399            .field("alloc", alloc)
400            .field("parts", parts)
401            .field("pos", pos)
402            .finish()
403    }
404}
405
406impl<K: AllocKind> Buffer<K> {
407    /// Gets the capacity of the buffer in bytes as requested for allocation.
408    pub fn cap(&self) -> usize {
409        self.parts.iter().fold(0, |acc, part| acc + part.cap)
410    }
411
412    /// Gets the length of the buffer which is actually used.
413    pub fn len(&self) -> usize {
414        self.parts.iter().fold(0, |acc, part| acc + part.len)
415    }
416
417    /// Writes bytes to the buffer.
418    ///
419    /// Writes up to `src.len()` bytes into the buffer beginning at `offset`,
420    /// returning how many bytes were written successfully. Partial write is
421    /// not considered as an error.
422    pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
423        if self.cap() < offset + src.len() {
424            return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
425        }
426        let mut part_start = 0;
427        let mut total = 0;
428        for part in self.parts.iter_mut() {
429            if offset + total < part_start + part.cap {
430                let written = part.write_at(offset + total - part_start, &src[total..])?;
431                total += written;
432                if total == src.len() {
433                    break;
434                }
435            } else {
436                part.len = part.cap;
437            }
438            part_start += part.cap;
439        }
440        assert_eq!(total, src.len());
441        Ok(())
442    }
443
444    /// Reads bytes from the buffer.
445    ///
446    /// Reads up to `dst.len()` bytes from the buffer beginning at `offset`,
447    /// returning how many bytes were read successfully. Partial read is
448    /// considered as an error.
449    pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
450        if self.len() < offset + dst.len() {
451            return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
452        }
453        let mut part_start = 0;
454        let mut total = 0;
455        for part in self.parts.iter() {
456            if offset + total < part_start + part.cap {
457                let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
458                total += read;
459                if total == dst.len() {
460                    break;
461                }
462            }
463            part_start += part.cap;
464        }
465        assert_eq!(total, dst.len());
466        Ok(())
467    }
468
469    /// Returns this buffer as a mutable slice if it's not fragmented.
470    pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
471        match &mut (*self.parts)[..] {
472            [] => Some(&mut []),
473            [one] => Some(one.as_slice_mut()),
474            _ => None,
475        }
476    }
477
478    /// Pads the [`Buffer`] to minimum tx buffer length requirements.
479    pub(in crate::session) fn pad(&mut self) -> Result<()> {
480        let num_parts = self.parts.len();
481        let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
482            self.alloc.pool.buffer_layout;
483        let mut target = min_tx_data;
484        for (i, part) in self.parts.iter_mut().enumerate() {
485            let grow_cap = if i == num_parts - 1 {
486                let descriptor =
487                    self.alloc.descriptors().last().expect("descriptor must not be empty");
488                let data_length = descriptor.data_length();
489                let tail_length = descriptor.tail_length();
490                // data_length + tail_length <= buffer_length <= usize::MAX.
491                let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
492                match rest.checked_sub(usize::from(min_tx_tail)) {
493                    Some(grow_cap) => Some(grow_cap),
494                    None => break,
495                }
496            } else {
497                None
498            };
499            target -= part.pad(target, grow_cap)?;
500        }
501        if target != 0 {
502            return Err(Error::Pad(min_tx_data, self.cap()));
503        }
504        Ok(())
505    }
506
507    /// Leaks the underlying buffer descriptors to the driver.
508    ///
509    /// Returns the head of the leaked allocation.
510    pub(in crate::session) fn leak(mut self) -> DescId<K> {
511        let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
512        descs.into_iter().next().unwrap()
513    }
514
515    /// Retrieves the frame type of the buffer.
516    pub fn frame_type(&self) -> Result<netdev::FrameType> {
517        self.alloc.descriptor().frame_type()
518    }
519
520    /// Retrieves the buffer's source port.
521    pub fn port(&self) -> Port {
522        self.alloc.descriptor().port()
523    }
524}
525
526impl Buffer<Tx> {
527    /// Commits the metadata for the buffer to descriptors.
528    pub(in crate::session) fn commit(&mut self) {
529        for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
530            // The following unwrap is safe because part.len must be smaller than
531            // buffer_length, which is a u32.
532            descriptor.commit(u32::try_from(part.len).unwrap())
533        }
534    }
535
536    /// Sets the buffer's destination port.
537    pub fn set_port(&mut self, port: Port) {
538        self.alloc.descriptor_mut().set_port(port)
539    }
540
541    /// Sets the frame type of the buffer.
542    pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
543        self.alloc.descriptor_mut().set_frame_type(frame_type)
544    }
545
546    /// Sets TxFlags of a Tx buffer.
547    pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
548        self.alloc.descriptor_mut().set_tx_flags(flags)
549    }
550}
551
552impl Buffer<Rx> {
553    /// Turns an rx buffer into a tx one.
554    pub async fn into_tx(self) -> Buffer<Tx> {
555        let Buffer { alloc, parts, pos } = self;
556        Buffer { alloc: alloc.into_tx().await, parts, pos }
557    }
558
559    /// Retrieves RxFlags of an Rx Buffer.
560    pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
561        self.alloc.descriptor().rx_flags()
562    }
563}
564
565impl AllocGuard<Rx> {
566    /// Turns a tx allocation into an rx one.
567    ///
568    /// To achieve this we have to convert the same amount of descriptors from
569    /// the tx pool to the rx pool to compensate for us being converted to tx
570    /// descriptors from rx ones.
571    async fn into_tx(mut self) -> AllocGuard<Tx> {
572        let mut tx = self.pool.alloc_tx(self.descs.len).await;
573        // [MaybeUninit<DescId<Tx>; 4] and [MaybeUninit<DescId<Rx>; 4] have the
574        // same memory layout because DescId is repr(transparent). So it is safe
575        // to transmute and swap the values between the storages. After the swap
576        // the drop implementation of self will return the descriptors back to
577        // rx pool.
578        std::mem::swap(&mut self.descs.storage, unsafe {
579            std::mem::transmute(&mut tx.descs.storage)
580        });
581        tx
582    }
583}
584
585/// A non-empty container that has at most [`netdev::MAX_DESCRIPTOR_CHAIN`] elements.
586struct Chained<T> {
587    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
588    len: ChainLength,
589}
590
591impl<T> Deref for Chained<T> {
592    type Target = [T];
593
594    fn deref(&self) -> &Self::Target {
595        // Safety: `self.storage[..self.len]` is already initialized.
596        unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
597    }
598}
599
600impl<T> DerefMut for Chained<T> {
601    fn deref_mut(&mut self) -> &mut Self::Target {
602        // Safety: `self.storage[..self.len]` is already initialized.
603        unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
604    }
605}
606
607impl<T> Drop for Chained<T> {
608    fn drop(&mut self) {
609        // Safety: `self.deref_mut()` is a slice of all initialized elements.
610        unsafe {
611            std::ptr::drop_in_place(self.deref_mut());
612        }
613    }
614}
615
616impl<T: Debug> Debug for Chained<T> {
617    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
618        f.debug_list().entries(self.iter()).finish()
619    }
620}
621
622impl<T> Chained<T> {
623    #[allow(clippy::uninit_assumed_init)]
624    fn empty() -> Self {
625        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
626        // safe because the type we are claiming to have initialized here is a
627        // bunch of `MaybeUninit`s, which do not require initialization.
628        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
629        // is stablized.
630        // https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.uninit_array
631        Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
632    }
633}
634
635impl<T> FromIterator<T> for Chained<T> {
636    /// # Panics
637    ///
638    /// if the iterator is empty or the iterator can yield more than
639    ///  MAX_DESCRIPTOR_CHAIN elements.
640    fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
641        let mut result = Self::empty();
642        let mut len = 0u8;
643        for (idx, e) in elements.into_iter().enumerate() {
644            result.storage[idx] = MaybeUninit::new(e);
645            len += 1;
646        }
647        assert!(len > 0);
648        // `len` can not be larger than `MAX_DESCRIPTOR_CHAIN`, otherwise we can't
649        // get here due to the bound checks on `result.storage`.
650        result.len = ChainLength::try_from(len).unwrap();
651        result
652    }
653}
654
655impl<T> IntoIterator for Chained<T> {
656    type Item = T;
657    type IntoIter = ChainedIter<T>;
658
659    fn into_iter(mut self) -> Self::IntoIter {
660        let len = self.len;
661        self.len = ChainLength::ZERO;
662        // Safety: we have reset the length to zero, it is now safe to move out
663        // the values and set them to be uninitialized. The `assume_init` is
664        // safe because the type we are claiming to have initialized here is a
665        // bunch of `MaybeUninit`s, which do not require initialization.
666        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
667        // is stablized.
668        #[allow(clippy::uninit_assumed_init)]
669        let storage =
670            std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
671        ChainedIter { storage, len, consumed: 0 }
672    }
673}
674
675struct ChainedIter<T> {
676    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
677    len: ChainLength,
678    consumed: u8,
679}
680
681impl<T> Iterator for ChainedIter<T> {
682    type Item = T;
683
684    fn next(&mut self) -> Option<Self::Item> {
685        if self.consumed < self.len.get() {
686            // Safety: it is safe now to replace that slot with an uninitialized
687            // value because we will advance consumed by 1.
688            let value = unsafe {
689                std::mem::replace(
690                    &mut self.storage[usize::from(self.consumed)],
691                    MaybeUninit::uninit(),
692                )
693                .assume_init()
694            };
695            self.consumed += 1;
696            Some(value)
697        } else {
698            None
699        }
700    }
701
702    fn size_hint(&self) -> (usize, Option<usize>) {
703        let len = usize::from(self.len.get() - self.consumed);
704        (len, Some(len))
705    }
706}
707
708impl<T> ExactSizeIterator for ChainedIter<T> {}
709
710impl<T> Drop for ChainedIter<T> {
711    fn drop(&mut self) {
712        // Safety: `self.storage[self.consumed..self.len]` is initialized.
713        unsafe {
714            std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
715                &mut self.storage[self.consumed.into()..self.len.into()],
716            ));
717        }
718    }
719}
720
721/// Guards the allocated descriptors; they will be freed when dropped.
722pub(in crate::session) struct AllocGuard<K: AllocKind> {
723    descs: Chained<DescId<K>>,
724    pool: Arc<Pool>,
725}
726
727impl<K: AllocKind> Debug for AllocGuard<K> {
728    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
729        let Self { descs, pool: _ } = self;
730        f.debug_struct("AllocGuard").field("descs", descs).finish()
731    }
732}
733
734impl<K: AllocKind> AllocGuard<K> {
735    fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
736        Self { descs, pool }
737    }
738
739    /// Iterates over references to the descriptors.
740    fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
741        self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
742    }
743
744    /// Iterates over mutable references to the descriptors.
745    fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
746        let descriptors = &self.pool.descriptors;
747        self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
748    }
749
750    /// Gets a reference to the head descriptor.
751    fn descriptor(&self) -> DescRef<'_, K> {
752        self.descriptors().next().expect("descriptors must not be empty")
753    }
754
755    /// Gets a mutable reference to the head descriptor.
756    fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
757        self.descriptors_mut().next().expect("descriptors must not be empty")
758    }
759}
760
761impl AllocGuard<Tx> {
762    /// Initializes descriptors of a tx allocation.
763    fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
764        let len = self.len();
765        let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
766            self.pool.buffer_layout;
767        for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
768            let chain_length = ChainLength::try_from(clen).unwrap();
769            let head_length = if clen + 1 == len { min_tx_head } else { 0 };
770            let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
771
772            // head_length and tail_length. The check was done when the config
773            // for pool was created, so the subtraction won't overflow.
774            let available_bytes =
775                u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
776                    .unwrap();
777
778            let data_length = match u32::try_from(requested_bytes) {
779                Ok(requested) => {
780                    if requested < available_bytes {
781                        // The requested bytes are less than what is available,
782                        // we need to put the excess in the tail so that the
783                        // user cannot write more than they requested.
784                        tail_length = u16::try_from(available_bytes - requested)
785                            .ok_checked::<TryFromIntError>()
786                            .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
787                            .ok_or(Error::TxLength)?;
788                    }
789                    requested.min(available_bytes)
790                }
791                Err(TryFromIntError { .. }) => available_bytes,
792            };
793
794            requested_bytes -=
795                usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
796                    panic!(
797                        "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
798                        data_length, requested_bytes
799                    )
800                });
801            descriptor.initialize(chain_length, head_length, data_length, tail_length);
802        }
803        assert_eq!(requested_bytes, 0);
804        Ok(())
805    }
806}
807
808impl<K: AllocKind> Drop for AllocGuard<K> {
809    fn drop(&mut self) {
810        if self.is_empty() {
811            return;
812        }
813        K::free(private::Allocation(self));
814    }
815}
816
817impl<K: AllocKind> Deref for AllocGuard<K> {
818    type Target = [DescId<K>];
819
820    fn deref(&self) -> &Self::Target {
821        self.descs.deref()
822    }
823}
824
825/// A contiguous region of the buffer; corresponding to one descriptor.
826///
827/// [`BufferPart`] owns the memory range [ptr, ptr+cap).
828struct BufferPart {
829    /// The data region starts at `ptr`.
830    ptr: *mut u8,
831    /// The capacity for the region is `cap`.
832    cap: usize,
833    /// Used to indicate how many bytes are actually in the buffer, it
834    /// starts as 0 for a tx buffer and as `cap` for a rx buffer. It will
835    /// be used later as `data_length` in the descriptor.
836    len: usize,
837}
838
839impl BufferPart {
840    /// Creates a new [`BufferPart`] that owns the memory region.
841    ///
842    /// # Safety
843    ///
844    /// The caller must make sure the memory pointed by `ptr` lives longer than
845    /// `BufferPart` being constructed. Once a BufferPart is constructed, it is
846    /// assumed that the memory `[ptr..ptr+cap)` is always valid to read and
847    /// write.
848    unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
849        Self { ptr, cap, len }
850    }
851
852    /// Reads bytes from this buffer part.
853    ///
854    /// Reads up to `dst.len()` bytes from the region beginning at `offset`,
855    /// returning how many bytes were read successfully. Partial read is
856    /// not considered as an error.
857    fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
858        let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
859        let to_copy = std::cmp::min(available, dst.len());
860        // Safety: both source memory region is valid for read the destination
861        // memory region is valid for write.
862        unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
863        Ok(to_copy)
864    }
865
866    /// Writes bytes to this buffer part.
867    ///
868    /// Writes up to `src.len()` bytes into the region beginning at `offset`,
869    /// returning how many bytes were written successfully. Partial write is
870    /// not considered as an error.
871    fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
872        let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
873        let to_copy = std::cmp::min(src.len(), available);
874        // Safety: both source memory region is valid for read the destination
875        // memory region is valid for write.
876        unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
877        self.len = std::cmp::max(self.len, offset + to_copy);
878        Ok(to_copy)
879    }
880
881    /// Pads this part of buffer to have length `target`.
882    ///
883    /// `limit` describes the limit for this region to grow beyond capacity.
884    /// `None` means the part is not allowed to grow and padding must be done
885    /// within the existing capacity, `Some(limit)` means this part is allowed
886    /// to extend its capacity up to the limit.
887    fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
888        if target <= self.len {
889            return Ok(target);
890        }
891        if let Some(limit) = limit {
892            if target > limit {
893                return Err(Error::Pad(target, self.cap));
894            }
895            if self.cap < target {
896                self.cap = target
897            }
898        }
899        let new_len = std::cmp::min(target, self.cap);
900        // Safety: This is safe because the destination memory region is valid
901        // for write.
902        unsafe {
903            std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
904        }
905        self.len = new_len;
906        Ok(new_len)
907    }
908
909    /// Returns the buffer part as a slice.
910    fn as_slice_mut(&mut self) -> &mut [u8] {
911        // SAFETY: BufferPart requires the caller to guarantee the ptr used to
912        // create outlives its instance. BufferPart itself is not copy or clone
913        // so we can rely on unsafety of `new` to uphold this.
914        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
915    }
916}
917
918// `Buffer` needs to be `Send` in order to be useful in async code. Instead
919// of marking `Buffer` as `Send` directly, `BufferPart` is `Send` already
920// and we can let the compiler do the deduction.
921unsafe impl Send for BufferPart {}
922
923impl Debug for BufferPart {
924    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
925        let BufferPart { len, cap, ptr } = &self;
926        f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
927    }
928}
929
930impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
931    fn from(alloc: AllocGuard<K>) -> Self {
932        let AllocGuard { pool, descs: _ } = &alloc;
933        let parts: Chained<BufferPart> = alloc
934            .descriptors()
935            .map(|descriptor| {
936                // The following unwraps are safe because they are already
937                // checked in `DeviceInfo::config`.
938                let offset = usize::try_from(descriptor.offset()).unwrap();
939                let head_length = usize::from(descriptor.head_length());
940                let data_length = usize::try_from(descriptor.data_length()).unwrap();
941                let len = match K::REFL {
942                    AllocKindRefl::Tx => 0,
943                    AllocKindRefl::Rx => data_length,
944                };
945                // Sanity check: make sure the layout is valid.
946                assert!(
947                    offset + head_length <= pool.bytes,
948                    "buffer part starts beyond the end of pool"
949                );
950                assert!(
951                    offset + head_length + data_length <= pool.bytes,
952                    "buffer part ends beyond the end of pool"
953                );
954                // This is safe because the `AllocGuard` makes sure the
955                // underlying memory is valid for the entire time when
956                // `BufferPart` is alive; `add` is safe because
957                // `offset + head_length is within the allocation and
958                // smaller than isize::MAX.
959                unsafe {
960                    BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
961                }
962            })
963            .collect();
964        Self { alloc, parts, pos: 0 }
965    }
966}
967
968impl<K: AllocKind> Read for Buffer<K> {
969    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
970        self.read_at(self.pos, buf)
971            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
972        self.pos += buf.len();
973        Ok(buf.len())
974    }
975}
976
977impl Write for Buffer<Tx> {
978    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
979        self.write_at(self.pos, buf)
980            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
981        self.pos += buf.len();
982        Ok(buf.len())
983    }
984
985    fn flush(&mut self) -> std::io::Result<()> {
986        Ok(())
987    }
988}
989
990impl<K: AllocKind> Seek for Buffer<K> {
991    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
992        let pos = match pos {
993            SeekFrom::Start(pos) => pos,
994            SeekFrom::End(offset) => {
995                let end = i64::try_from(self.cap())
996                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
997                u64::try_from(end.wrapping_add(offset)).unwrap()
998            }
999            SeekFrom::Current(offset) => {
1000                let current = i64::try_from(self.pos)
1001                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1002                u64::try_from(current.wrapping_add(offset)).unwrap()
1003            }
1004        };
1005        self.pos =
1006            usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1007        Ok(pos)
1008    }
1009}
1010
1011/// A pending tx allocation request.
1012struct TxAllocReq {
1013    sender: Sender<AllocGuard<Tx>>,
1014    size: ChainLength,
1015}
1016
1017impl TxAllocReq {
1018    fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1019        let (sender, receiver) = channel();
1020        (TxAllocReq { sender, size }, receiver)
1021    }
1022
1023    /// Fulfills the pending request with an `AllocGuard`.
1024    ///
1025    /// If the request is already closed, the guard is simply dropped and
1026    /// returned to the queue.
1027    ///
1028    /// `fulfill` must *not* be called when the `guard`'s pool is holding the tx
1029    /// lock, since we may deadlock/panic upon the double tx lock acquisition.
1030    fn fulfill(self, guard: AllocGuard<Tx>) {
1031        let Self { sender, size: _ } = self;
1032        match sender.send(guard) {
1033            Ok(()) => (),
1034            Err(guard) => {
1035                // It's ok to just drop the guard here, it'll be returned to the
1036                // pool.
1037                drop(guard);
1038            }
1039        }
1040    }
1041}
1042
1043/// A module for sealed traits so that the user of this crate can not implement
1044/// [`AllocKind`] for anything than [`Rx`] and [`Tx`].
1045mod private {
1046    use super::{AllocKind, Rx, Tx};
1047    pub trait Sealed: 'static + Sized {}
1048    impl Sealed for Rx {}
1049    impl Sealed for Tx {}
1050
1051    // We can't leak a private type in a public trait, create an opaque private
1052    // new type for &mut super::AllocGuard so that we can mention it in the
1053    // AllocKind trait.
1054    pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1055}
1056
1057/// An allocation can have two kinds, this trait provides a way to project a
1058/// type ([`Rx`] or [`Tx`]) into a value.
1059pub trait AllocKind: private::Sealed {
1060    /// The reflected value of Self.
1061    const REFL: AllocKindRefl;
1062
1063    /// frees an allocation of the given kind.
1064    fn free(alloc: private::Allocation<'_, Self>);
1065}
1066
1067/// A tag to related types for Tx allocations.
1068pub enum Tx {}
1069/// A tag to related types for Rx allocations.
1070pub enum Rx {}
1071
1072/// The reflected value that allows inspection on an [`AllocKind`] type.
1073pub enum AllocKindRefl {
1074    Tx,
1075    Rx,
1076}
1077
1078impl AllocKindRefl {
1079    pub(in crate::session) fn as_str(&self) -> &'static str {
1080        match self {
1081            AllocKindRefl::Tx => "Tx",
1082            AllocKindRefl::Rx => "Rx",
1083        }
1084    }
1085}
1086
1087impl AllocKind for Tx {
1088    const REFL: AllocKindRefl = AllocKindRefl::Tx;
1089
1090    fn free(alloc: private::Allocation<'_, Self>) {
1091        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1092        pool.free_tx(std::mem::replace(descs, Chained::empty()));
1093    }
1094}
1095
1096impl AllocKind for Rx {
1097    const REFL: AllocKindRefl = AllocKindRefl::Rx;
1098
1099    fn free(alloc: private::Allocation<'_, Self>) {
1100        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1101        pool.free_rx(std::mem::replace(descs, Chained::empty()));
1102        pool.rx_leases.rx_complete();
1103    }
1104}
1105
1106/// An extracted struct containing state pertaining to watching rx leases.
1107pub(in crate::session) struct RxLeaseHandlingState {
1108    can_watch_rx_leases: AtomicBool,
1109    /// Keeps a rolling counter of received rx frames MINUS the target frame
1110    /// number of the current outstanding lease.
1111    ///
1112    /// When no leases are pending (via [`RxLeaseWatcher::wait_until`]),
1113    /// then this matches exactly the number of received frames.
1114    ///
1115    /// Otherwise, the lease is currently waiting for remaining `u64::MAX -
1116    /// rx_Frame_counter` frames. The logic depends on `AtomicU64` wrapping
1117    /// around as part of completing rx buffers.
1118    rx_frame_counter: AtomicU64,
1119    rx_lease_waker: AtomicWaker,
1120}
1121
1122impl RxLeaseHandlingState {
1123    fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1124        Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1125    }
1126
1127    fn new_with_enabled(enabled: bool) -> Self {
1128        Self {
1129            can_watch_rx_leases: AtomicBool::new(enabled),
1130            rx_frame_counter: AtomicU64::new(0),
1131            rx_lease_waker: AtomicWaker::new(),
1132        }
1133    }
1134
1135    /// Increments the total receive frame counter and possibly wakes up a
1136    /// waiting lease yielder.
1137    fn rx_complete(&self) {
1138        let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1139        let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1140
1141        // See wait_until for details. We need to hit a waker whenever our add
1142        // wrapped the u64 back around to 0.
1143        if prev == u64::MAX {
1144            rx_lease_waker.wake();
1145        }
1146    }
1147}
1148
1149/// A trait allowing [`RxLeaseWatcher`] to be agnostic over how to get an
1150/// [`RxLeaseHandlingState`].
1151pub(in crate::session) trait RxLeaseHandlingStateContainer {
1152    fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1153}
1154
1155impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1156    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1157        self.borrow()
1158    }
1159}
1160
1161impl RxLeaseHandlingStateContainer for Arc<Pool> {
1162    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1163        &self.rx_leases
1164    }
1165}
1166
1167/// A type safe-wrapper around a single lease watcher per `Pool`.
1168pub(in crate::session) struct RxLeaseWatcher<T> {
1169    state: T,
1170}
1171
1172impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1173    /// Creates a new lease watcher.
1174    ///
1175    /// # Panics
1176    ///
1177    /// Panics if an [`RxLeaseWatcher`] has already been created for the given
1178    /// pool or the pool was not configured for it.
1179    pub(in crate::session) fn new(state: T) -> Self {
1180        assert!(
1181            state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1182            "can't watch rx leases"
1183        );
1184        Self { state }
1185    }
1186
1187    /// Called by sessions to wait until `hold_until_frame` is fulfilled to
1188    /// yield leases out.
1189    ///
1190    /// Blocks until `hold_until_frame`-th rx buffer has been released.
1191    ///
1192    /// Note that this method takes `&mut self` because only one
1193    /// [`RxLeaseWatcher`] may be created by lease handling state, and exclusive
1194    /// access to it is required to watch lease completion.
1195    pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1196        // A note about wrap-arounds.
1197        //
1198        // We're assuming the frame counter will never wrap around for
1199        // correctness here. This should be fine, even assuming a packet
1200        // rate of 1 million pps it'd take almost 600k years for this counter
1201        // to wrap around:
1202        // - 2^64 / 1e6 / 60 / 60 / 24 / 365 ~ 584e3.
1203
1204        let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1205            self.state.lease_handling_state();
1206
1207        let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1208        // After having subtracted the waiting value we *must always restore the
1209        // value* on return, even if the future is not polled to completion.
1210        let _guard = scopeguard::guard((), |()| {
1211            let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1212        });
1213
1214        // Lease is ready to be fulfilled.
1215        if prev >= hold_until_frame {
1216            return;
1217        }
1218        // Threshold is a wrapped around subtraction. So now we must wait
1219        // until the read value from the atomic is LESS THAN the threshold.
1220        let threshold = prev.wrapping_sub(hold_until_frame);
1221        futures::future::poll_fn(|cx| {
1222            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1223            if v < threshold {
1224                return Poll::Ready(());
1225            }
1226            rx_lease_waker.register(cx.waker());
1227            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1228            if v < threshold {
1229                return Poll::Ready(());
1230            }
1231            Poll::Pending
1232        })
1233        .await;
1234    }
1235}
1236
1237#[cfg(test)]
1238mod tests {
1239    use super::*;
1240
1241    use assert_matches::assert_matches;
1242    use fuchsia_async as fasync;
1243    use futures::future::FutureExt;
1244    use test_case::test_case;
1245
1246    use std::collections::HashSet;
1247    use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1248    use std::pin::pin;
1249    use std::task::{Poll, Waker};
1250
1251    const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1252    const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1253    // Safety: These are safe because none of the values are zero.
1254    const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1255    const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1256    const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1257    const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1258        * netdev::MAX_DESCRIPTOR_CHAIN as usize
1259        - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1260        - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1261
1262    const SENTINEL_BYTE: u8 = 0xab;
1263    const WRITE_BYTE: u8 = 1;
1264    const PAD_BYTE: u8 = 0;
1265
1266    const DEFAULT_CONFIG: Config = Config {
1267        buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1268        num_rx_buffers: DEFAULT_RX_BUFFERS,
1269        num_tx_buffers: DEFAULT_TX_BUFFERS,
1270        options: netdev::SessionFlags::empty(),
1271        buffer_layout: BufferLayout {
1272            length: DEFAULT_BUFFER_LENGTH.get(),
1273            min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1274            min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1275            min_tx_data: 0,
1276        },
1277    };
1278
1279    impl Pool {
1280        fn new_test_default() -> Arc<Self> {
1281            let (pool, _descriptors, _data) =
1282                Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1283            pool
1284        }
1285
1286        async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1287            self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1288                .await
1289        }
1290
1291        fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1292            self.alloc_tx_checked(n).now_or_never()
1293        }
1294
1295        fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1296            std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1297        }
1298
1299        fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1300            self.alloc_tx_buffer(num_bytes)
1301                .now_or_never()
1302                .transpose()
1303                .expect("invalid arguments for alloc_tx_buffer")
1304        }
1305
1306        fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1307            Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1308        }
1309
1310        fn fill_sentinel_bytes(&mut self) {
1311            // Safety: We have mut reference to Pool, so we get to modify the
1312            // VMO pointed by self.base.
1313            unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1314        }
1315    }
1316
1317    impl Buffer<Tx> {
1318        // Write a byte at offset, the result buffer should be pad_size long, with
1319        // 0..offset being the SENTINEL_BYTE, offset being the WRITE_BYTE and the
1320        // rest being PAD_BYTE.
1321        fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1322            self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1323            self.pad().expect("failed to pad");
1324            assert_eq!(self.len(), pad_size);
1325            // An arbitrary value that is not SENTINAL/WRITE/PAD_BYTE so that
1326            // we can make sure the write really happened.
1327            const INIT_BYTE: u8 = 42;
1328            let mut read_buf = vec![INIT_BYTE; pad_size];
1329            self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1330            for (idx, byte) in read_buf.iter().enumerate() {
1331                if idx < offset {
1332                    assert_eq!(*byte, SENTINEL_BYTE);
1333                } else if idx == offset {
1334                    assert_eq!(*byte, WRITE_BYTE);
1335                } else {
1336                    assert_eq!(*byte, PAD_BYTE);
1337                }
1338            }
1339        }
1340    }
1341
1342    impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1343    where
1344        K: AllocKind,
1345        I: ExactSizeIterator<Item = u16>,
1346        T: Copy + IntoIterator<IntoIter = I>,
1347    {
1348        fn eq(&self, other: &T) -> bool {
1349            let iter = other.into_iter();
1350            if usize::from(self.len) != iter.len() {
1351                return false;
1352            }
1353            self.iter().zip(iter).all(|(l, r)| l.get() == r)
1354        }
1355    }
1356
1357    impl Debug for TxAllocReq {
1358        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1359            let TxAllocReq { sender: _, size } = self;
1360            f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1361        }
1362    }
1363
1364    #[test]
1365    fn alloc_tx_distinct() {
1366        let pool = Pool::new_test_default();
1367        let allocated = pool.alloc_tx_all(1);
1368        assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1369        let distinct = allocated
1370            .iter()
1371            .map(|alloc| {
1372                assert_eq!(alloc.descs.len(), 1);
1373                alloc.descs[0].get()
1374            })
1375            .collect::<HashSet<u16>>();
1376        assert_eq!(allocated.len(), distinct.len());
1377    }
1378
1379    #[test]
1380    fn alloc_tx_free_len() {
1381        let pool = Pool::new_test_default();
1382        {
1383            let allocated = pool.alloc_tx_all(2);
1384            assert_eq!(
1385                allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1386                DEFAULT_TX_BUFFERS.get().into()
1387            );
1388            assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1389        }
1390        assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1391    }
1392
1393    #[test]
1394    fn alloc_tx_chain() {
1395        let pool = Pool::new_test_default();
1396        let allocated = pool.alloc_tx_all(3);
1397        assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1398        assert_matches!(pool.alloc_tx_now_or_never(3), None);
1399        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1400    }
1401
1402    #[test]
1403    fn alloc_tx_many() {
1404        let pool = Pool::new_test_default();
1405        let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1406            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1407            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1408        let data_len = usize::try_from(data_len).unwrap();
1409        let mut buffers = pool
1410            .alloc_tx_buffers(data_len)
1411            .now_or_never()
1412            .expect("failed to alloc")
1413            .unwrap()
1414            // Collect into a vec so we keep the buffers alive, otherwise they
1415            // are immediately returned to the pool.
1416            .collect::<Result<Vec<_>>>()
1417            .expect("buffer error");
1418        assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1419
1420        // We have all the buffers, which means allocating more should not
1421        // resolve.
1422        assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1423
1424        // If we release a single buffer we should be able to retrieve it again.
1425        assert_matches!(buffers.pop(), Some(_));
1426        let mut more_buffers =
1427            pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1428        let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1429        assert_matches!(more_buffers.next(), None);
1430        // The iterator is fused, so None is yielded even after dropping the
1431        // buffer.
1432        drop(buffer);
1433        assert_matches!(more_buffers.next(), None);
1434    }
1435
1436    #[test]
1437    fn alloc_tx_after_free() {
1438        let pool = Pool::new_test_default();
1439        let mut allocated = pool.alloc_tx_all(1);
1440        assert_matches!(pool.alloc_tx_now_or_never(2), None);
1441        {
1442            let _drained = allocated.drain(..2);
1443        }
1444        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1445    }
1446
1447    #[test]
1448    fn blocking_alloc_tx() {
1449        let mut executor = fasync::TestExecutor::new();
1450        let pool = Pool::new_test_default();
1451        let mut allocated = pool.alloc_tx_all(1);
1452        let alloc_fut = pool.alloc_tx_checked(1);
1453        let mut alloc_fut = pin!(alloc_fut);
1454        // The allocation should block.
1455        assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1456        // And the allocation request should be queued.
1457        assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1458        let freed = allocated
1459            .pop()
1460            .expect("no fulfulled allocations")
1461            .iter()
1462            .map(|x| x.get())
1463            .collect::<Chained<_>>();
1464        let same_as_freed =
1465            |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1466        // Now the task should be able to continue.
1467        assert_matches!(
1468            &executor.run_until_stalled(&mut alloc_fut),
1469            Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1470        );
1471        // And the queued request should now be removed.
1472        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1473    }
1474
1475    #[test]
1476    fn blocking_alloc_tx_cancel_before_free() {
1477        let mut executor = fasync::TestExecutor::new();
1478        let pool = Pool::new_test_default();
1479        let mut allocated = pool.alloc_tx_all(1);
1480        {
1481            let alloc_fut = pool.alloc_tx_checked(1);
1482            let mut alloc_fut = pin!(alloc_fut);
1483            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1484            assert_matches!(
1485                pool.tx_alloc_state.lock().requests.as_slices(),
1486                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1487            );
1488        }
1489        assert_matches!(
1490            allocated.pop(),
1491            Some(AllocGuard { ref descs, pool: ref p })
1492                if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1493        );
1494        let state = pool.tx_alloc_state.lock();
1495        assert_eq!(state.free_list.len, 1);
1496        assert!(state.requests.is_empty());
1497    }
1498
1499    #[test]
1500    fn blocking_alloc_tx_cancel_after_free() {
1501        let mut executor = fasync::TestExecutor::new();
1502        let pool = Pool::new_test_default();
1503        let mut allocated = pool.alloc_tx_all(1);
1504        {
1505            let alloc_fut = pool.alloc_tx_checked(1);
1506            let mut alloc_fut = pin!(alloc_fut);
1507            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1508            assert_matches!(
1509                pool.tx_alloc_state.lock().requests.as_slices(),
1510                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1511            );
1512            assert_matches!(
1513                allocated.pop(),
1514                Some(AllocGuard { ref descs, pool: ref p })
1515                    if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1516            );
1517        }
1518        let state = pool.tx_alloc_state.lock();
1519        assert_eq!(state.free_list.len, 1);
1520        assert!(state.requests.is_empty());
1521    }
1522
1523    #[test]
1524    fn multiple_blocking_alloc_tx_fulfill_order() {
1525        const TASKS_TOTAL: usize = 3;
1526        let mut executor = fasync::TestExecutor::new();
1527        let pool = Pool::new_test_default();
1528        let mut allocated = pool.alloc_tx_all(1);
1529        let mut alloc_futs = (1..=TASKS_TOTAL)
1530            .rev()
1531            .map(|x| {
1532                let pool = pool.clone();
1533                (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1534            })
1535            .collect::<Vec<_>>();
1536
1537        for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1538            assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1539            // assert that the tasks are sorted decreasing on the requested size.
1540            assert_eq!(idx + *req_size, TASKS_TOTAL);
1541        }
1542        {
1543            let state = pool.tx_alloc_state.lock();
1544            // The first pending request was introduced by `alloc_tx_all`.
1545            assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1546            let mut requests = state.requests.iter();
1547            // It should already be cancelled because the requesting future is
1548            // already dropped.
1549            assert!(requests.next().unwrap().sender.is_canceled());
1550            // The rest of the requests must not be cancelled.
1551            assert!(requests.all(|req| !req.sender.is_canceled()))
1552        }
1553
1554        let mut to_free = Vec::new();
1555        let mut freed = 0;
1556        for free_size in (1..=TASKS_TOTAL).rev() {
1557            let (_req_size, mut task) = alloc_futs.remove(0);
1558            for _ in 1..free_size {
1559                freed += 1;
1560                assert_matches!(
1561                    allocated.pop(),
1562                    Some(AllocGuard { ref descs, pool: ref p })
1563                        if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1564                );
1565                assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1566            }
1567            freed += 1;
1568            assert_matches!(
1569                allocated.pop(),
1570                Some(AllocGuard { ref descs, pool: ref p })
1571                    if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1572            );
1573            match executor.run_until_stalled(&mut task) {
1574                Poll::Ready(alloc) => {
1575                    assert_eq!(alloc.len(), free_size);
1576                    // Don't return the allocation to the pool now.
1577                    to_free.push(alloc);
1578                }
1579                Poll::Pending => panic!("The request should be fulfilled"),
1580            }
1581            // The rest of requests can not be fulfilled.
1582            for (_req_size, task) in alloc_futs.iter_mut() {
1583                assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1584            }
1585        }
1586        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1587    }
1588
1589    #[test]
1590    fn singleton_tx_layout() {
1591        let pool = Pool::new_test_default();
1592        let buffers = std::iter::from_fn(|| {
1593            let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1594                - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1595                - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1596            pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1597                assert_eq!(buffer.alloc.descriptors().count(), 1);
1598                let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1599                    * u64::from(buffer.alloc[0].get());
1600                {
1601                    let descriptor = buffer.alloc.descriptor();
1602                    assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1603                    assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1604                    assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1605                    assert_eq!(descriptor.data_length(), data_len);
1606                    assert_eq!(descriptor.offset(), offset);
1607                }
1608
1609                assert_eq!(buffer.parts.len(), 1);
1610                let BufferPart { ptr, len, cap } = buffer.parts[0];
1611                assert_eq!(len, 0);
1612                assert_eq!(
1613                    // Using wrapping_add because we will never dereference the
1614                    // resulting pointer and it saves us an unsafe block.
1615                    pool.base.as_ptr().wrapping_add(
1616                        usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1617                    ),
1618                    ptr
1619                );
1620                assert_eq!(data_len, u32::try_from(cap).unwrap());
1621                buffer
1622            })
1623        })
1624        .collect::<Vec<_>>();
1625        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1626    }
1627
1628    #[test]
1629    fn chained_tx_layout() {
1630        let pool = Pool::new_test_default();
1631        let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1632            - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1633            - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1634        let buffers = std::iter::from_fn(|| {
1635            pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1636                assert_eq!(buffer.parts.len(), 4);
1637                for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1638                    let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1639                    let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1640                    let tail_length = if chain_length == ChainLength::ZERO {
1641                        DEFAULT_MIN_TX_BUFFER_TAIL
1642                    } else {
1643                        0
1644                    };
1645                    let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1646                        - u32::from(head_length)
1647                        - u32::from(tail_length);
1648                    let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1649                        * u64::from(buffer.alloc[idx].get());
1650                    assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1651                    assert_eq!(descriptor.head_length(), head_length);
1652                    assert_eq!(descriptor.tail_length(), tail_length);
1653                    assert_eq!(descriptor.offset(), offset);
1654                    assert_eq!(descriptor.data_length(), data_len);
1655                    if chain_length != ChainLength::ZERO {
1656                        assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1657                    }
1658
1659                    let BufferPart { ptr, cap, len } = buffer.parts[idx];
1660                    assert_eq!(len, 0);
1661                    assert_eq!(
1662                        // Using wrapping_add because we will never dereference
1663                        // the resulting ptr and it saves us an unsafe block.
1664                        pool.base.as_ptr().wrapping_add(
1665                            usize::try_from(offset).unwrap() + usize::from(head_length),
1666                        ),
1667                        ptr
1668                    );
1669                    assert_eq!(data_len, u32::try_from(cap).unwrap());
1670                }
1671                buffer
1672            })
1673        })
1674        .collect::<Vec<_>>();
1675        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1676    }
1677
1678    #[test]
1679    fn rx_distinct() {
1680        let pool = Pool::new_test_default();
1681        let mut guard = pool.rx_pending.inner.lock();
1682        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1683        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1684        let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1685        assert_eq!(descs.len(), distinct.len());
1686    }
1687
1688    #[test]
1689    fn alloc_rx_layout() {
1690        let pool = Pool::new_test_default();
1691        let mut guard = pool.rx_pending.inner.lock();
1692        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1693        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1694        for desc in descs.iter() {
1695            let descriptor = pool.descriptors.borrow(desc);
1696            let offset =
1697                u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1698            assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1699            assert_eq!(descriptor.head_length(), 0);
1700            assert_eq!(descriptor.tail_length(), 0);
1701            assert_eq!(descriptor.offset(), offset);
1702            assert_eq!(
1703                descriptor.data_length(),
1704                u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1705            );
1706        }
1707    }
1708
1709    #[test]
1710    fn buffer_read_at_write_at() {
1711        let pool = Pool::new_test_default();
1712        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1713        let mut buffer =
1714            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1715        // Because we have to accommodate the space for head and tail, there
1716        // would be 2 parts instead of 1.
1717        assert_eq!(buffer.parts.len(), 2);
1718        assert_eq!(buffer.cap(), alloc_bytes);
1719        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1720        buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1721        let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1722        buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1723        for (idx, byte) in read_buf.iter().enumerate() {
1724            assert_eq!(*byte, write_buf[idx]);
1725        }
1726    }
1727
1728    #[test]
1729    fn buffer_read_write_seek() {
1730        let pool = Pool::new_test_default();
1731        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1732        let mut buffer =
1733            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1734        // Because we have to accommodate the space for head and tail, there
1735        // would be 2 parts instead of 1.
1736        assert_eq!(buffer.parts.len(), 2);
1737        assert_eq!(buffer.cap(), alloc_bytes);
1738        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1739        assert_eq!(
1740            buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1741            write_buf.len()
1742        );
1743        const SEEK_FROM_END: usize = 64;
1744        const READ_LEN: usize = 12;
1745        assert_eq!(
1746            buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1747            u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1748        );
1749        let mut read_buf = [0xff; READ_LEN];
1750        assert_eq!(
1751            buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1752            read_buf.len()
1753        );
1754        assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1755    }
1756
1757    #[test_case(32; "single buffer part")]
1758    #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1759    fn buffer_pad(pad_size: usize) {
1760        let mut pool = Pool::new_test_default();
1761        pool.set_min_tx_buffer_length(pad_size);
1762        for offset in 0..pad_size {
1763            Arc::get_mut(&mut pool)
1764                .expect("there are multiple owners of the underlying VMO")
1765                .fill_sentinel_bytes();
1766            let mut buffer =
1767                pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1768            buffer.check_write_and_pad(offset, pad_size);
1769        }
1770    }
1771
1772    #[test]
1773    fn buffer_pad_grow() {
1774        const BUFFER_PARTS: u8 = 3;
1775        let mut pool = Pool::new_test_default();
1776        let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1777            * u32::from(BUFFER_PARTS)
1778            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1779            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1780        pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1781        for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1782            Arc::get_mut(&mut pool)
1783                .expect("there are multiple owners of the underlying VMO")
1784                .fill_sentinel_bytes();
1785            let mut alloc =
1786                pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1787            alloc
1788                .init(
1789                    DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1790                        - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1791                        - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1792                )
1793                .expect("head/body/tail sizes are representable with u16/u32/u16");
1794            let mut buffer = Buffer::try_from(alloc).unwrap();
1795            buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1796        }
1797    }
1798
1799    #[test_case(  0; "writes at the beginning")]
1800    #[test_case( 15; "writes in the first part")]
1801    #[test_case( 75; "writes in the second part")]
1802    #[test_case(135; "writes in the third part")]
1803    #[test_case(195; "writes in the last part")]
1804    fn buffer_used(write_offset: usize) {
1805        let pool = Pool::new_test_default();
1806        let mut buffer =
1807            pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1808        let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1809            if i == 0 {
1810                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1811            } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1812                DEFAULT_BUFFER_LENGTH.get()
1813            } else {
1814                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1815            }
1816        });
1817        assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1818        buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1819        // The accumulator is Some if we haven't found the part where the byte
1820        // was written, None if we've already found it.
1821        assert_eq!(
1822            buffer.parts.iter().zip(expected_caps).fold(
1823                Some(write_offset),
1824                |offset, (part, expected_cap)| {
1825                    // The cap must match the expectation.
1826                    assert_eq!(part.cap, expected_cap);
1827
1828                    match offset {
1829                        Some(offset) => {
1830                            if offset >= expected_cap {
1831                                // The part should have used all the capacity.
1832                                assert_eq!(part.len, part.cap);
1833                                Some(offset - part.len)
1834                            } else {
1835                                // The part should end right after our byte.
1836                                assert_eq!(part.len, offset + 1);
1837                                let mut buf = [0];
1838                                // Verify that the byte is indeed written.
1839                                assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1840                                assert_eq!(buf[0], WRITE_BYTE);
1841                                None
1842                            }
1843                        }
1844                        None => {
1845                            // We should have never written in this part.
1846                            assert_eq!(part.len, 0);
1847                            None
1848                        }
1849                    }
1850                }
1851            ),
1852            None
1853        )
1854    }
1855
1856    #[test]
1857    fn buffer_commit() {
1858        let pool = Pool::new_test_default();
1859        for offset in 0..MAX_BUFFER_BYTES {
1860            let mut buffer = pool
1861                .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1862                .expect("failed to allocate buffer");
1863            buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1864            buffer.commit();
1865            for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1866                let head_length = descriptor.head_length();
1867                let tail_length = descriptor.tail_length();
1868                let data_length = descriptor.data_length();
1869                assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1870                assert_eq!(
1871                    u32::from(head_length + tail_length) + data_length,
1872                    u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1873                );
1874            }
1875        }
1876    }
1877
1878    #[test]
1879    fn allocate_under_device_minimum() {
1880        const MIN_TX_DATA: usize = 32;
1881        const ALLOC_SIZE: usize = 16;
1882        const WRITE_BYTE: u8 = 0xff;
1883        const WRITE_SENTINAL_BYTE: u8 = 0xee;
1884        const READ_SENTINAL_BYTE: u8 = 0xdd;
1885        let mut config = DEFAULT_CONFIG;
1886        config.buffer_layout.min_tx_data = MIN_TX_DATA;
1887        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1888        for mut buffer in Vec::from_iter(std::iter::from_fn({
1889            let pool = pool.clone();
1890            move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1891        })) {
1892            buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1893        }
1894        let mut allocated =
1895            pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1896        assert_eq!(allocated.cap(), ALLOC_SIZE);
1897        const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1898        assert_matches!(
1899            allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1900            Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1901        );
1902        allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1903        assert_matches!(allocated.pad(), Ok(()));
1904        assert_eq!(allocated.cap(), MIN_TX_DATA);
1905        assert_eq!(allocated.len(), MIN_TX_DATA);
1906        const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1907        let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1908        assert_matches!(
1909            allocated.read_at(0, &mut read_buf[..]),
1910            Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1911        );
1912        allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1913        assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1914        assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1915        assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1916    }
1917
1918    #[test]
1919    fn invalid_tx_length() {
1920        let mut config = DEFAULT_CONFIG;
1921        config.buffer_layout.length = usize::from(u16::MAX) + 2;
1922        config.buffer_layout.min_tx_head = 0;
1923        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1924        assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1925    }
1926
1927    #[test]
1928    fn rx_leases() {
1929        let mut executor = fuchsia_async::TestExecutor::new();
1930        let state = RxLeaseHandlingState::new_with_enabled(true);
1931        let mut watcher = RxLeaseWatcher { state: &state };
1932
1933        {
1934            let mut fut = pin!(watcher.wait_until(0));
1935            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1936        }
1937        {
1938            state.rx_complete();
1939            let mut fut = pin!(watcher.wait_until(1));
1940            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1941        }
1942        {
1943            let mut fut = pin!(watcher.wait_until(0));
1944            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1945        }
1946        {
1947            let mut fut = pin!(watcher.wait_until(3));
1948            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1949            state.rx_complete();
1950            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1951            state.rx_complete();
1952            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1953        }
1954        // Dropping the wait future without seeing it complete restores the
1955        // value.
1956        let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1957        {
1958            let mut fut = pin!(watcher.wait_until(10000));
1959            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1960        }
1961        let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1962        assert_eq!(counter_before, counter_after);
1963    }
1964}