netdevice_client/session/buffer/
pool.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice buffer pool.
6
7use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::{ManuallyDrop, MaybeUninit};
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::atomic::{self, AtomicBool, AtomicU64};
19use std::sync::Arc;
20use std::task::Poll;
21
22use explicit::ResultExt as _;
23use fidl_fuchsia_hardware_network as netdev;
24use fuchsia_runtime::vmar_root_self;
25use futures::channel::oneshot::{channel, Receiver, Sender};
26use zx::AsHandleRef as _;
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32/// Responsible for managing [`Buffer`]s for a [`Session`](crate::session::Session).
33pub(in crate::session) struct Pool {
34    /// Base address of the pool.
35    // Note: This field requires us to manually implement `Sync` and `Send`.
36    base: NonNull<u8>,
37    /// The length of the pool in bytes.
38    bytes: usize,
39    /// The descriptors allocated for the pool.
40    descriptors: Descriptors,
41    /// Shared state for allocation.
42    tx_alloc_state: Mutex<TxAllocState>,
43    /// The free rx descriptors pending to be sent to driver.
44    pub(in crate::session) rx_pending: Pending<Rx>,
45    /// The buffer layout.
46    buffer_layout: BufferLayout,
47    /// State-keeping allowing sessions to handle rx leases.
48    rx_leases: RxLeaseHandlingState,
49}
50
51// `Pool` is `Send` and `Sync`, and this allows the compiler to deduce `Buffer`
52// to be `Send`. These impls are safe because we can safely share `Pool` and
53// `&Pool`: the implementation would never allocate the same buffer to two
54// callers at the same time.
55unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58/// The shared state which keeps track of available buffers and tx buffers.
59struct TxAllocState {
60    /// All pending tx allocation requests.
61    requests: VecDeque<TxAllocReq>,
62    free_list: TxFreeList,
63}
64
65/// We use a linked list to maintain the tx free descriptors - they are linked
66/// through their `nxt` fields, note this differs from the chaining expected
67/// by the network device protocol:
68/// - You can chain more than [`netdev::MAX_DESCRIPTOR_CHAIN`] descriptors
69///   together.
70/// - the free-list ends when the `nxt` field is 0xff, while the normal chain
71///   ends when `chain_length` becomes 0.
72struct TxFreeList {
73    /// The head of a linked list of available descriptors that can be allocated
74    /// for tx.
75    head: Option<DescId<Tx>>,
76    /// How many free descriptors are there in the pool.
77    len: u16,
78}
79
80impl Pool {
81    /// Creates a new [`Pool`] and its backing [`zx::Vmo`]s.
82    ///
83    /// Returns [`Pool`] and the [`zx::Vmo`]s for descriptors and data, in that
84    /// order.
85    pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86        let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87            config;
88        let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89        let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90            Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92        // Construct the free list.
93        let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94            descriptors.borrow_mut(&mut curr).set_nxt(head);
95            Some(curr)
96        });
97
98        for rx_desc in rx_free.iter_mut() {
99            descriptors.borrow_mut(rx_desc).initialize(
100                ChainLength::ZERO,
101                0,
102                buffer_layout.length.try_into().unwrap(),
103                0,
104            );
105        }
106
107        let tx_alloc_state = TxAllocState {
108            free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109            requests: VecDeque::new(),
110        };
111
112        let size = buffer_stride.get() * u64::from(num_buffers);
113        let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115        const VMO_NAME: zx::Name =
116            const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117        data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118        // `as` is OK because `size` is positive and smaller than isize::MAX.
119        // This is following the practice of rust stdlib to ensure allocation
120        // size never reaches isize::MAX.
121        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
122        let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123        // The returned address of zx_vmar_map on success must be non-zero:
124        // https://fuchsia.dev/fuchsia-src/reference/syscalls/vmar_map
125        let base = NonNull::new(
126            vmar_root_self()
127                .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128                .map_err(|status| Error::Map("data", status))? as *mut u8,
129        )
130        .unwrap();
131
132        Ok((
133            Arc::new(Pool {
134                base,
135                bytes: len,
136                descriptors,
137                tx_alloc_state: Mutex::new(tx_alloc_state),
138                rx_pending: Pending::new(rx_free),
139                buffer_layout,
140                rx_leases: RxLeaseHandlingState::new_with_flags(options),
141            }),
142            descriptors_vmo,
143            data_vmo,
144        ))
145    }
146
147    /// Allocates `num_parts` tx descriptors.
148    ///
149    /// It will block if there are not enough descriptors. Note that the
150    /// descriptors are not initialized, you need to call [`AllocGuard::init()`]
151    /// on the returned [`AllocGuard`] if you want to send it to the driver
152    /// later. See [`AllocGuard<Rx>::into_tx()`] for an example where
153    /// [`AllocGuard::init()`] is not needed because the tx allocation will be
154    /// returned to the pool immediately and won't be sent to the driver.
155    pub(in crate::session) async fn alloc_tx(
156        self: &Arc<Self>,
157        num_parts: ChainLength,
158    ) -> AllocGuard<Tx> {
159        let receiver = {
160            let mut state = self.tx_alloc_state.lock();
161            match state.free_list.try_alloc(num_parts, &self.descriptors) {
162                Some(allocated) => {
163                    return AllocGuard::new(allocated, self.clone());
164                }
165                None => {
166                    let (request, receiver) = TxAllocReq::new(num_parts);
167                    state.requests.push_back(request);
168                    receiver
169                }
170            }
171        };
172        // The sender must not be dropped.
173        receiver.await.unwrap()
174    }
175
176    /// Allocates a tx [`Buffer`].
177    ///
178    /// The returned buffer will have `num_bytes` as its capacity, the method
179    /// will block if there are not enough buffers. An error will be returned if
180    /// the requested size cannot meet the device requirement, for example, if
181    /// the size of the head or tail region will become unrepresentable in u16.
182    pub(in crate::session) async fn alloc_tx_buffer(
183        self: &Arc<Self>,
184        num_bytes: usize,
185    ) -> Result<Buffer<Tx>> {
186        self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
187    }
188
189    /// Waits for at least one TX buffer to be available and returns an iterator
190    /// of buffers with `num_bytes` as capacity.
191    ///
192    /// The returned iterator is guaranteed to yield at least one item (though
193    /// it might be an error if the requested size cannot meet the device
194    /// requirement).
195    ///
196    /// # Note
197    ///
198    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
199    /// returned iterator will seemingly yield infinite items if the yielded
200    /// `Buffer`s are dropped while iterating.
201    pub(in crate::session) async fn alloc_tx_buffers<'a>(
202        self: &'a Arc<Self>,
203        num_bytes: usize,
204    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
205        let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
206            self.buffer_layout;
207        let tx_head = usize::from(min_tx_head);
208        let tx_tail = usize::from(min_tx_tail);
209        let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
210        let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
211        let chain_length = ChainLength::try_from(num_parts)?;
212        let first = self.alloc_tx(chain_length).await;
213        let iter = std::iter::once(first)
214            .chain(std::iter::from_fn(move || {
215                let mut state = self.tx_alloc_state.lock();
216                state
217                    .free_list
218                    .try_alloc(chain_length, &self.descriptors)
219                    .map(|allocated| AllocGuard::new(allocated, self.clone()))
220            }))
221            // Fuse afterwards so we're guaranteeing we can't see a new entry
222            // after having yielded `None` once.
223            .fuse()
224            .map(move |mut alloc| {
225                alloc.init(num_bytes)?;
226                Ok(alloc.into())
227            });
228        Ok(iter)
229    }
230
231    /// Frees rx descriptors.
232    pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
233        self.rx_pending.extend(descs.into_iter().map(|mut desc| {
234            self.descriptors.borrow_mut(&mut desc).initialize(
235                ChainLength::ZERO,
236                0,
237                self.buffer_layout.length.try_into().unwrap(),
238                0,
239            );
240            desc
241        }));
242    }
243
244    /// Frees tx descriptors.
245    ///
246    /// # Panics
247    ///
248    /// Panics if given an empty chain.
249    fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
250        let free_impl = |free_list: &mut TxFreeList, chain: Chained<DescId<Tx>>| {
251            let mut descs = chain.into_iter();
252            // The following can't overflow because we can have at most u16::MAX
253            // descriptors: free_len + #(to_free) + #(descs in use) <= u16::MAX,
254            // Thus free_len + #(to_free) <= u16::MAX.
255            free_list.len += u16::try_from(descs.len()).unwrap();
256            let head = descs.next();
257            let old_head = std::mem::replace(&mut free_list.head, head);
258            let mut tail = descs.last();
259            let mut tail_ref = self
260                .descriptors
261                .borrow_mut(tail.as_mut().unwrap_or_else(|| free_list.head.as_mut().unwrap()));
262            tail_ref.set_nxt(old_head);
263        };
264
265        let mut state = self.tx_alloc_state.lock();
266        let TxAllocState { requests, free_list } = &mut *state;
267        let () = free_impl(free_list, chain);
268
269        // After putting the chain back into the free list, we try to fulfill
270        // any pending tx allocation requests.
271        while let Some(req) = requests.front() {
272            match free_list.try_alloc(req.size, &self.descriptors) {
273                Some(descs) => {
274                    // The unwrap is safe because we know requests is not empty.
275                    match requests
276                        .pop_front()
277                        .unwrap()
278                        .sender
279                        .send(AllocGuard::new(descs, self.clone()))
280                        .map_err(ManuallyDrop::new)
281                    {
282                        Ok(()) => {}
283                        Err(mut alloc) => {
284                            let AllocGuard { descs, pool } = alloc.deref_mut();
285                            // We can't run the Drop code for AllocGuard here to
286                            // return the descriptors though, because we are holding
287                            // the lock on the alloc state and the lock is not
288                            // reentrant, so we manually free the descriptors.
289                            let () =
290                                free_impl(free_list, std::mem::replace(descs, Chained::empty()));
291                            // Safety: alloc is wrapped in ManuallyDrop, so alloc.pool
292                            // will not be dropped twice.
293                            let () = unsafe {
294                                std::ptr::drop_in_place(pool);
295                            };
296                        }
297                    }
298                }
299                None => {
300                    if req.sender.is_canceled() {
301                        let _cancelled: Option<TxAllocReq> = requests.pop_front();
302                        continue;
303                    } else {
304                        break;
305                    }
306                }
307            }
308        }
309    }
310
311    /// Frees the completed tx descriptors chained by head to the pool.
312    ///
313    /// Call this function when the driver hands back a completed tx descriptor.
314    pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
315        let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
316        Ok(self.free_tx(chain))
317    }
318
319    /// Creates a [`Buffer<Rx>`] corresponding to the completed rx descriptors.
320    ///
321    /// Whenever the driver hands back a completed rx descriptor, this function
322    /// can be used to create the buffer that is represented by those chained
323    /// descriptors.
324    pub(in crate::session) fn rx_completed(
325        self: &Arc<Self>,
326        head: DescId<Rx>,
327    ) -> Result<Buffer<Rx>> {
328        let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
329        let alloc = AllocGuard::new(descs, self.clone());
330        Ok(alloc.into())
331    }
332}
333
334impl Drop for Pool {
335    fn drop(&mut self) {
336        unsafe {
337            vmar_root_self()
338                .unmap(self.base.as_ptr() as usize, self.bytes)
339                .expect("failed to unmap VMO for Pool")
340        }
341    }
342}
343
344impl TxFreeList {
345    /// Tries to allocate tx descriptors.
346    ///
347    /// Returns [`None`] if there are not enough descriptors.
348    fn try_alloc(
349        &mut self,
350        num_parts: ChainLength,
351        descriptors: &Descriptors,
352    ) -> Option<Chained<DescId<Tx>>> {
353        if u16::from(num_parts.get()) > self.len {
354            return None;
355        }
356
357        let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
358            let new_head = self.head.as_ref().and_then(|head| {
359                let nxt = descriptors.borrow(head).nxt();
360                nxt.map(|id| unsafe {
361                    // Safety: This is the nxt field of head of the free list,
362                    // it must be a tx descriptor id.
363                    DescId::from_raw(id)
364                })
365            });
366            std::mem::replace(&mut self.head, new_head)
367        });
368        let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
369        assert_eq!(allocated.len(), num_parts.into());
370        self.len -= u16::from(num_parts.get());
371        Some(allocated)
372    }
373}
374
375/// The buffer that can be used by the [`Session`](crate::session::Session).
376///
377/// All [`Buffer`]s implement [`std::io::Read`] and [`Buffer<Tx>`]s implement
378/// [`std::io::Write`].
379pub struct Buffer<K: AllocKind> {
380    /// The descriptors allocation.
381    alloc: AllocGuard<K>,
382    /// Underlying memory regions.
383    parts: Chained<BufferPart>,
384    /// The current absolute position to read/write within the [`Buffer`].
385    pos: usize,
386}
387
388impl<K: AllocKind> Debug for Buffer<K> {
389    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
390        let Self { alloc, parts, pos } = self;
391        f.debug_struct("Buffer")
392            .field("cap", &self.cap())
393            .field("alloc", alloc)
394            .field("parts", parts)
395            .field("pos", pos)
396            .finish()
397    }
398}
399
400impl<K: AllocKind> Buffer<K> {
401    /// Gets the capacity of the buffer in bytes as requested for allocation.
402    pub fn cap(&self) -> usize {
403        self.parts.iter().fold(0, |acc, part| acc + part.cap)
404    }
405
406    /// Gets the length of the buffer which is actually used.
407    pub fn len(&self) -> usize {
408        self.parts.iter().fold(0, |acc, part| acc + part.len)
409    }
410
411    /// Writes bytes to the buffer.
412    ///
413    /// Writes up to `src.len()` bytes into the buffer beginning at `offset`,
414    /// returning how many bytes were written successfully. Partial write is
415    /// not considered as an error.
416    pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
417        if self.cap() < offset + src.len() {
418            return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
419        }
420        let mut part_start = 0;
421        let mut total = 0;
422        for part in self.parts.iter_mut() {
423            if offset + total < part_start + part.cap {
424                let written = part.write_at(offset + total - part_start, &src[total..])?;
425                total += written;
426                if total == src.len() {
427                    break;
428                }
429            } else {
430                part.len = part.cap;
431            }
432            part_start += part.cap;
433        }
434        assert_eq!(total, src.len());
435        Ok(())
436    }
437
438    /// Reads bytes from the buffer.
439    ///
440    /// Reads up to `dst.len()` bytes from the buffer beginning at `offset`,
441    /// returning how many bytes were read successfully. Partial read is
442    /// considered as an error.
443    pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
444        if self.len() < offset + dst.len() {
445            return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
446        }
447        let mut part_start = 0;
448        let mut total = 0;
449        for part in self.parts.iter() {
450            if offset + total < part_start + part.cap {
451                let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
452                total += read;
453                if total == dst.len() {
454                    break;
455                }
456            }
457            part_start += part.cap;
458        }
459        assert_eq!(total, dst.len());
460        Ok(())
461    }
462
463    /// Returns this buffer as a mutable slice if it's not fragmented.
464    pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
465        match &mut (*self.parts)[..] {
466            [] => Some(&mut []),
467            [one] => Some(one.as_slice_mut()),
468            _ => None,
469        }
470    }
471
472    /// Pads the [`Buffer`] to minimum tx buffer length requirements.
473    pub(in crate::session) fn pad(&mut self) -> Result<()> {
474        let num_parts = self.parts.len();
475        let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
476            self.alloc.pool.buffer_layout;
477        let mut target = min_tx_data;
478        for (i, part) in self.parts.iter_mut().enumerate() {
479            let grow_cap = if i == num_parts - 1 {
480                let descriptor =
481                    self.alloc.descriptors().last().expect("descriptor must not be empty");
482                let data_length = descriptor.data_length();
483                let tail_length = descriptor.tail_length();
484                // data_length + tail_length <= buffer_length <= usize::MAX.
485                let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
486                match rest.checked_sub(usize::from(min_tx_tail)) {
487                    Some(grow_cap) => Some(grow_cap),
488                    None => break,
489                }
490            } else {
491                None
492            };
493            target -= part.pad(target, grow_cap)?;
494        }
495        if target != 0 {
496            return Err(Error::Pad(min_tx_data, self.cap()));
497        }
498        Ok(())
499    }
500
501    /// Leaks the underlying buffer descriptors to the driver.
502    ///
503    /// Returns the head of the leaked allocation.
504    pub(in crate::session) fn leak(mut self) -> DescId<K> {
505        let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
506        descs.into_iter().next().unwrap()
507    }
508
509    /// Retrieves the frame type of the buffer.
510    pub fn frame_type(&self) -> Result<netdev::FrameType> {
511        self.alloc.descriptor().frame_type()
512    }
513
514    /// Retrieves the buffer's source port.
515    pub fn port(&self) -> Port {
516        self.alloc.descriptor().port()
517    }
518}
519
520impl Buffer<Tx> {
521    /// Commits the metadata for the buffer to descriptors.
522    pub(in crate::session) fn commit(&mut self) {
523        for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
524            // The following unwrap is safe because part.len must be smaller than
525            // buffer_length, which is a u32.
526            descriptor.commit(u32::try_from(part.len).unwrap())
527        }
528    }
529
530    /// Sets the buffer's destination port.
531    pub fn set_port(&mut self, port: Port) {
532        self.alloc.descriptor_mut().set_port(port)
533    }
534
535    /// Sets the frame type of the buffer.
536    pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
537        self.alloc.descriptor_mut().set_frame_type(frame_type)
538    }
539
540    /// Sets TxFlags of a Tx buffer.
541    pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
542        self.alloc.descriptor_mut().set_tx_flags(flags)
543    }
544}
545
546impl Buffer<Rx> {
547    /// Turns an rx buffer into a tx one.
548    pub async fn into_tx(self) -> Buffer<Tx> {
549        let Buffer { alloc, parts, pos } = self;
550        Buffer { alloc: alloc.into_tx().await, parts, pos }
551    }
552
553    /// Retrieves RxFlags of an Rx Buffer.
554    pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
555        self.alloc.descriptor().rx_flags()
556    }
557}
558
559impl AllocGuard<Rx> {
560    /// Turns a tx allocation into an rx one.
561    ///
562    /// To achieve this we have to convert the same amount of descriptors from
563    /// the tx pool to the rx pool to compensate for us being converted to tx
564    /// descriptors from rx ones.
565    async fn into_tx(mut self) -> AllocGuard<Tx> {
566        let mut tx = self.pool.alloc_tx(self.descs.len).await;
567        // [MaybeUninit<DescId<Tx>; 4] and [MaybeUninit<DescId<Rx>; 4] have the
568        // same memory layout because DescId is repr(transparent). So it is safe
569        // to transmute and swap the values between the storages. After the swap
570        // the drop implementation of self will return the descriptors back to
571        // rx pool.
572        std::mem::swap(&mut self.descs.storage, unsafe {
573            std::mem::transmute(&mut tx.descs.storage)
574        });
575        tx
576    }
577}
578
579/// A non-empty container that has at most [`netdev::MAX_DESCRIPTOR_CHAIN`] elements.
580struct Chained<T> {
581    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
582    len: ChainLength,
583}
584
585impl<T> Deref for Chained<T> {
586    type Target = [T];
587
588    fn deref(&self) -> &Self::Target {
589        // Safety: `self.storage[..self.len]` is already initialized.
590        unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
591    }
592}
593
594impl<T> DerefMut for Chained<T> {
595    fn deref_mut(&mut self) -> &mut Self::Target {
596        // Safety: `self.storage[..self.len]` is already initialized.
597        unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
598    }
599}
600
601impl<T> Drop for Chained<T> {
602    fn drop(&mut self) {
603        // Safety: `self.deref_mut()` is a slice of all initialized elements.
604        unsafe {
605            std::ptr::drop_in_place(self.deref_mut());
606        }
607    }
608}
609
610impl<T: Debug> Debug for Chained<T> {
611    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612        f.debug_list().entries(self.iter()).finish()
613    }
614}
615
616impl<T> Chained<T> {
617    #[allow(clippy::uninit_assumed_init)]
618    fn empty() -> Self {
619        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
620        // safe because the type we are claiming to have initialized here is a
621        // bunch of `MaybeUninit`s, which do not require initialization.
622        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
623        // is stablized.
624        // https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.uninit_array
625        Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
626    }
627}
628
629impl<T> FromIterator<T> for Chained<T> {
630    /// # Panics
631    ///
632    /// if the iterator is empty or the iterator can yield more than
633    ///  MAX_DESCRIPTOR_CHAIN elements.
634    fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
635        let mut result = Self::empty();
636        let mut len = 0u8;
637        for (idx, e) in elements.into_iter().enumerate() {
638            result.storage[idx] = MaybeUninit::new(e);
639            len += 1;
640        }
641        assert!(len > 0);
642        // `len` can not be larger than `MAX_DESCRIPTOR_CHAIN`, otherwise we can't
643        // get here due to the bound checks on `result.storage`.
644        result.len = ChainLength::try_from(len).unwrap();
645        result
646    }
647}
648
649impl<T> IntoIterator for Chained<T> {
650    type Item = T;
651    type IntoIter = ChainedIter<T>;
652
653    fn into_iter(mut self) -> Self::IntoIter {
654        let len = self.len;
655        self.len = ChainLength::ZERO;
656        // Safety: we have reset the length to zero, it is now safe to move out
657        // the values and set them to be uninitialized. The `assume_init` is
658        // safe because the type we are claiming to have initialized here is a
659        // bunch of `MaybeUninit`s, which do not require initialization.
660        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
661        // is stablized.
662        #[allow(clippy::uninit_assumed_init)]
663        let storage =
664            std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
665        ChainedIter { storage, len, consumed: 0 }
666    }
667}
668
669struct ChainedIter<T> {
670    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
671    len: ChainLength,
672    consumed: u8,
673}
674
675impl<T> Iterator for ChainedIter<T> {
676    type Item = T;
677
678    fn next(&mut self) -> Option<Self::Item> {
679        if self.consumed < self.len.get() {
680            // Safety: it is safe now to replace that slot with an uninitialized
681            // value because we will advance consumed by 1.
682            let value = unsafe {
683                std::mem::replace(
684                    &mut self.storage[usize::from(self.consumed)],
685                    MaybeUninit::uninit(),
686                )
687                .assume_init()
688            };
689            self.consumed += 1;
690            Some(value)
691        } else {
692            None
693        }
694    }
695
696    fn size_hint(&self) -> (usize, Option<usize>) {
697        let len = usize::from(self.len.get() - self.consumed);
698        (len, Some(len))
699    }
700}
701
702impl<T> ExactSizeIterator for ChainedIter<T> {}
703
704impl<T> Drop for ChainedIter<T> {
705    fn drop(&mut self) {
706        // Safety: `self.storage[self.consumed..self.len]` is initialized.
707        unsafe {
708            std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
709                &mut self.storage[self.consumed.into()..self.len.into()],
710            ));
711        }
712    }
713}
714
715/// Guards the allocated descriptors; they will be freed when dropped.
716pub(in crate::session) struct AllocGuard<K: AllocKind> {
717    descs: Chained<DescId<K>>,
718    pool: Arc<Pool>,
719}
720
721impl<K: AllocKind> Debug for AllocGuard<K> {
722    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
723        let Self { descs, pool: _ } = self;
724        f.debug_struct("AllocGuard").field("descs", descs).finish()
725    }
726}
727
728impl<K: AllocKind> AllocGuard<K> {
729    fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
730        Self { descs, pool }
731    }
732
733    /// Iterates over references to the descriptors.
734    fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
735        self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
736    }
737
738    /// Iterates over mutable references to the descriptors.
739    fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
740        let descriptors = &self.pool.descriptors;
741        self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
742    }
743
744    /// Gets a reference to the head descriptor.
745    fn descriptor(&self) -> DescRef<'_, K> {
746        self.descriptors().next().expect("descriptors must not be empty")
747    }
748
749    /// Gets a mutable reference to the head descriptor.
750    fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
751        self.descriptors_mut().next().expect("descriptors must not be empty")
752    }
753}
754
755impl AllocGuard<Tx> {
756    /// Initializes descriptors of a tx allocation.
757    fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
758        let len = self.len();
759        let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
760            self.pool.buffer_layout;
761        for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
762            let chain_length = ChainLength::try_from(clen).unwrap();
763            let head_length = if clen + 1 == len { min_tx_head } else { 0 };
764            let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
765
766            // head_length and tail_length. The check was done when the config
767            // for pool was created, so the subtraction won't overflow.
768            let available_bytes =
769                u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
770                    .unwrap();
771
772            let data_length = match u32::try_from(requested_bytes) {
773                Ok(requested) => {
774                    if requested < available_bytes {
775                        // The requested bytes are less than what is available,
776                        // we need to put the excess in the tail so that the
777                        // user cannot write more than they requested.
778                        tail_length = u16::try_from(available_bytes - requested)
779                            .ok_checked::<TryFromIntError>()
780                            .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
781                            .ok_or(Error::TxLength)?;
782                    }
783                    requested.min(available_bytes)
784                }
785                Err(TryFromIntError { .. }) => available_bytes,
786            };
787
788            requested_bytes -=
789                usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
790                    panic!(
791                        "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
792                        data_length, requested_bytes
793                    )
794                });
795            descriptor.initialize(chain_length, head_length, data_length, tail_length);
796        }
797        assert_eq!(requested_bytes, 0);
798        Ok(())
799    }
800}
801
802impl<K: AllocKind> Drop for AllocGuard<K> {
803    fn drop(&mut self) {
804        if self.is_empty() {
805            return;
806        }
807        K::free(private::Allocation(self));
808    }
809}
810
811impl<K: AllocKind> Deref for AllocGuard<K> {
812    type Target = [DescId<K>];
813
814    fn deref(&self) -> &Self::Target {
815        self.descs.deref()
816    }
817}
818
819/// A contiguous region of the buffer; corresponding to one descriptor.
820///
821/// [`BufferPart`] owns the memory range [ptr, ptr+cap).
822struct BufferPart {
823    /// The data region starts at `ptr`.
824    ptr: *mut u8,
825    /// The capacity for the region is `cap`.
826    cap: usize,
827    /// Used to indicate how many bytes are actually in the buffer, it
828    /// starts as 0 for a tx buffer and as `cap` for a rx buffer. It will
829    /// be used later as `data_length` in the descriptor.
830    len: usize,
831}
832
833impl BufferPart {
834    /// Creates a new [`BufferPart`] that owns the memory region.
835    ///
836    /// # Safety
837    ///
838    /// The caller must make sure the memory pointed by `ptr` lives longer than
839    /// `BufferPart` being constructed. Once a BufferPart is constructed, it is
840    /// assumed that the memory `[ptr..ptr+cap)` is always valid to read and
841    /// write.
842    unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
843        Self { ptr, cap, len }
844    }
845
846    /// Reads bytes from this buffer part.
847    ///
848    /// Reads up to `dst.len()` bytes from the region beginning at `offset`,
849    /// returning how many bytes were read successfully. Partial read is
850    /// not considered as an error.
851    fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
852        let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
853        let to_copy = std::cmp::min(available, dst.len());
854        // Safety: both source memory region is valid for read the destination
855        // memory region is valid for write.
856        unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
857        Ok(to_copy)
858    }
859
860    /// Writes bytes to this buffer part.
861    ///
862    /// Writes up to `src.len()` bytes into the region beginning at `offset`,
863    /// returning how many bytes were written successfully. Partial write is
864    /// not considered as an error.
865    fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
866        let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
867        let to_copy = std::cmp::min(src.len(), available);
868        // Safety: both source memory region is valid for read the destination
869        // memory region is valid for write.
870        unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
871        self.len = std::cmp::max(self.len, offset + to_copy);
872        Ok(to_copy)
873    }
874
875    /// Pads this part of buffer to have length `target`.
876    ///
877    /// `limit` describes the limit for this region to grow beyond capacity.
878    /// `None` means the part is not allowed to grow and padding must be done
879    /// within the existing capacity, `Some(limit)` means this part is allowed
880    /// to extend its capacity up to the limit.
881    fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
882        if target <= self.len {
883            return Ok(target);
884        }
885        if let Some(limit) = limit {
886            if target > limit {
887                return Err(Error::Pad(target, self.cap));
888            }
889            if self.cap < target {
890                self.cap = target
891            }
892        }
893        let new_len = std::cmp::min(target, self.cap);
894        // Safety: This is safe because the destination memory region is valid
895        // for write.
896        unsafe {
897            std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
898        }
899        self.len = new_len;
900        Ok(new_len)
901    }
902
903    /// Returns the buffer part as a slice.
904    fn as_slice_mut(&mut self) -> &mut [u8] {
905        // SAFETY: BufferPart requires the caller to guarantee the ptr used to
906        // create outlives its instance. BufferPart itself is not copy or clone
907        // so we can rely on unsafety of `new` to uphold this.
908        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
909    }
910}
911
912// `Buffer` needs to be `Send` in order to be useful in async code. Instead
913// of marking `Buffer` as `Send` directly, `BufferPart` is `Send` already
914// and we can let the compiler do the deduction.
915unsafe impl Send for BufferPart {}
916
917impl Debug for BufferPart {
918    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919        let BufferPart { len, cap, ptr } = &self;
920        f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
921    }
922}
923
924impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
925    fn from(alloc: AllocGuard<K>) -> Self {
926        let AllocGuard { pool, descs: _ } = &alloc;
927        let parts: Chained<BufferPart> = alloc
928            .descriptors()
929            .map(|descriptor| {
930                // The following unwraps are safe because they are already
931                // checked in `DeviceInfo::config`.
932                let offset = usize::try_from(descriptor.offset()).unwrap();
933                let head_length = usize::from(descriptor.head_length());
934                let data_length = usize::try_from(descriptor.data_length()).unwrap();
935                let len = match K::REFL {
936                    AllocKindRefl::Tx => 0,
937                    AllocKindRefl::Rx => data_length,
938                };
939                // Sanity check: make sure the layout is valid.
940                assert!(
941                    offset + head_length <= pool.bytes,
942                    "buffer part starts beyond the end of pool"
943                );
944                assert!(
945                    offset + head_length + data_length <= pool.bytes,
946                    "buffer part ends beyond the end of pool"
947                );
948                // This is safe because the `AllocGuard` makes sure the
949                // underlying memory is valid for the entire time when
950                // `BufferPart` is alive; `add` is safe because
951                // `offset + head_length is within the allocation and
952                // smaller than isize::MAX.
953                unsafe {
954                    BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
955                }
956            })
957            .collect();
958        Self { alloc, parts, pos: 0 }
959    }
960}
961
962impl<K: AllocKind> Read for Buffer<K> {
963    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
964        self.read_at(self.pos, buf)
965            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
966        self.pos += buf.len();
967        Ok(buf.len())
968    }
969}
970
971impl Write for Buffer<Tx> {
972    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
973        self.write_at(self.pos, buf)
974            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
975        self.pos += buf.len();
976        Ok(buf.len())
977    }
978
979    fn flush(&mut self) -> std::io::Result<()> {
980        Ok(())
981    }
982}
983
984impl<K: AllocKind> Seek for Buffer<K> {
985    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
986        let pos = match pos {
987            SeekFrom::Start(pos) => pos,
988            SeekFrom::End(offset) => {
989                let end = i64::try_from(self.cap())
990                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
991                u64::try_from(end.wrapping_add(offset)).unwrap()
992            }
993            SeekFrom::Current(offset) => {
994                let current = i64::try_from(self.pos)
995                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
996                u64::try_from(current.wrapping_add(offset)).unwrap()
997            }
998        };
999        self.pos =
1000            usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1001        Ok(pos)
1002    }
1003}
1004
1005/// A pending tx allocation request.
1006struct TxAllocReq {
1007    sender: Sender<AllocGuard<Tx>>,
1008    size: ChainLength,
1009}
1010
1011impl TxAllocReq {
1012    fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1013        let (sender, receiver) = channel();
1014        (TxAllocReq { sender, size }, receiver)
1015    }
1016}
1017
1018/// A module for sealed traits so that the user of this crate can not implement
1019/// [`AllocKind`] for anything than [`Rx`] and [`Tx`].
1020mod private {
1021    use super::{AllocKind, Rx, Tx};
1022    pub trait Sealed: 'static + Sized {}
1023    impl Sealed for Rx {}
1024    impl Sealed for Tx {}
1025
1026    // We can't leak a private type in a public trait, create an opaque private
1027    // new type for &mut super::AllocGuard so that we can mention it in the
1028    // AllocKind trait.
1029    pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1030}
1031
1032/// An allocation can have two kinds, this trait provides a way to project a
1033/// type ([`Rx`] or [`Tx`]) into a value.
1034pub trait AllocKind: private::Sealed {
1035    /// The reflected value of Self.
1036    const REFL: AllocKindRefl;
1037
1038    /// frees an allocation of the given kind.
1039    fn free(alloc: private::Allocation<'_, Self>);
1040}
1041
1042/// A tag to related types for Tx allocations.
1043pub enum Tx {}
1044/// A tag to related types for Rx allocations.
1045pub enum Rx {}
1046
1047/// The reflected value that allows inspection on an [`AllocKind`] type.
1048pub enum AllocKindRefl {
1049    Tx,
1050    Rx,
1051}
1052
1053impl AllocKindRefl {
1054    pub(in crate::session) fn as_str(&self) -> &'static str {
1055        match self {
1056            AllocKindRefl::Tx => "Tx",
1057            AllocKindRefl::Rx => "Rx",
1058        }
1059    }
1060}
1061
1062impl AllocKind for Tx {
1063    const REFL: AllocKindRefl = AllocKindRefl::Tx;
1064
1065    fn free(alloc: private::Allocation<'_, Self>) {
1066        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1067        pool.free_tx(std::mem::replace(descs, Chained::empty()));
1068    }
1069}
1070
1071impl AllocKind for Rx {
1072    const REFL: AllocKindRefl = AllocKindRefl::Rx;
1073
1074    fn free(alloc: private::Allocation<'_, Self>) {
1075        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1076        pool.free_rx(std::mem::replace(descs, Chained::empty()));
1077        pool.rx_leases.rx_complete();
1078    }
1079}
1080
1081/// An extracted struct containing state pertaining to watching rx leases.
1082pub(in crate::session) struct RxLeaseHandlingState {
1083    can_watch_rx_leases: AtomicBool,
1084    /// Keeps a rolling counter of received rx frames MINUS the target frame
1085    /// number of the current outstanding lease.
1086    ///
1087    /// When no leases are pending (via [`RxLeaseWatcher::wait_until`]),
1088    /// then this matches exactly the number of received frames.
1089    ///
1090    /// Otherwise, the lease is currently waiting for remaining `u64::MAX -
1091    /// rx_Frame_counter` frames. The logic depends on `AtomicU64` wrapping
1092    /// around as part of completing rx buffers.
1093    rx_frame_counter: AtomicU64,
1094    rx_lease_waker: AtomicWaker,
1095}
1096
1097impl RxLeaseHandlingState {
1098    fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1099        Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1100    }
1101
1102    fn new_with_enabled(enabled: bool) -> Self {
1103        Self {
1104            can_watch_rx_leases: AtomicBool::new(enabled),
1105            rx_frame_counter: AtomicU64::new(0),
1106            rx_lease_waker: AtomicWaker::new(),
1107        }
1108    }
1109
1110    /// Increments the total receive frame counter and possibly wakes up a
1111    /// waiting lease yielder.
1112    fn rx_complete(&self) {
1113        let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1114        let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1115
1116        // See wait_until for details. We need to hit a waker whenever our add
1117        // wrapped the u64 back around to 0.
1118        if prev == u64::MAX {
1119            rx_lease_waker.wake();
1120        }
1121    }
1122}
1123
1124/// A trait allowing [`RxLeaseWatcher`] to be agnostic over how to get an
1125/// [`RxLeaseHandlingState`].
1126pub(in crate::session) trait RxLeaseHandlingStateContainer {
1127    fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1128}
1129
1130impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1131    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1132        self.borrow()
1133    }
1134}
1135
1136impl RxLeaseHandlingStateContainer for Arc<Pool> {
1137    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1138        &self.rx_leases
1139    }
1140}
1141
1142/// A type safe-wrapper around a single lease watcher per `Pool`.
1143pub(in crate::session) struct RxLeaseWatcher<T> {
1144    state: T,
1145}
1146
1147impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1148    /// Creates a new lease watcher.
1149    ///
1150    /// # Panics
1151    ///
1152    /// Panics if an [`RxLeaseWatcher`] has already been created for the given
1153    /// pool or the pool was not configured for it.
1154    pub(in crate::session) fn new(state: T) -> Self {
1155        assert!(
1156            state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1157            "can't watch rx leases"
1158        );
1159        Self { state }
1160    }
1161
1162    /// Called by sessions to wait until `hold_until_frame` is fulfilled to
1163    /// yield leases out.
1164    ///
1165    /// Blocks until `hold_until_frame`-th rx buffer has been released.
1166    ///
1167    /// Note that this method takes `&mut self` because only one
1168    /// [`RxLeaseWatcher`] may be created by lease handling state, and exclusive
1169    /// access to it is required to watch lease completion.
1170    pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1171        // A note about wrap-arounds.
1172        //
1173        // We're assuming the frame counter will never wrap around for
1174        // correctness here. This should be fine, even assuming a packet
1175        // rate of 1 million pps it'd take almost 600k years for this counter
1176        // to wrap around:
1177        // - 2^64 / 1e6 / 60 / 60 / 24 / 365 ~ 584e3.
1178
1179        let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1180            self.state.lease_handling_state();
1181
1182        let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1183        // After having subtracted the waiting value we *must always restore the
1184        // value* on return, even if the future is not polled to completion.
1185        let _guard = scopeguard::guard((), |()| {
1186            let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1187        });
1188
1189        // Lease is ready to be fulfilled.
1190        if prev >= hold_until_frame {
1191            return;
1192        }
1193        // Threshold is a wrapped around subtraction. So now we must wait
1194        // until the read value from the atomic is LESS THAN the threshold.
1195        let threshold = prev.wrapping_sub(hold_until_frame);
1196        futures::future::poll_fn(|cx| {
1197            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1198            if v < threshold {
1199                return Poll::Ready(());
1200            }
1201            rx_lease_waker.register(cx.waker());
1202            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1203            if v < threshold {
1204                return Poll::Ready(());
1205            }
1206            Poll::Pending
1207        })
1208        .await;
1209    }
1210}
1211
1212#[cfg(test)]
1213mod tests {
1214    use super::*;
1215
1216    use assert_matches::assert_matches;
1217    use fuchsia_async as fasync;
1218    use futures::future::FutureExt;
1219    use test_case::test_case;
1220
1221    use std::collections::HashSet;
1222    use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1223    use std::pin::pin;
1224    use std::task::{Poll, Waker};
1225
1226    const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1227    const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1228    // Safety: These are safe because none of the values are zero.
1229    const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1230    const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1231    const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1232    const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1233        * netdev::MAX_DESCRIPTOR_CHAIN as usize
1234        - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1235        - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1236
1237    const SENTINEL_BYTE: u8 = 0xab;
1238    const WRITE_BYTE: u8 = 1;
1239    const PAD_BYTE: u8 = 0;
1240
1241    const DEFAULT_CONFIG: Config = Config {
1242        buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1243        num_rx_buffers: DEFAULT_RX_BUFFERS,
1244        num_tx_buffers: DEFAULT_TX_BUFFERS,
1245        options: netdev::SessionFlags::empty(),
1246        buffer_layout: BufferLayout {
1247            length: DEFAULT_BUFFER_LENGTH.get(),
1248            min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1249            min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1250            min_tx_data: 0,
1251        },
1252    };
1253
1254    impl Pool {
1255        fn new_test_default() -> Arc<Self> {
1256            let (pool, _descriptors, _data) =
1257                Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1258            pool
1259        }
1260
1261        async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1262            self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1263                .await
1264        }
1265
1266        fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1267            self.alloc_tx_checked(n).now_or_never()
1268        }
1269
1270        fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1271            std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1272        }
1273
1274        fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1275            self.alloc_tx_buffer(num_bytes)
1276                .now_or_never()
1277                .transpose()
1278                .expect("invalid arguments for alloc_tx_buffer")
1279        }
1280
1281        fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1282            Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1283        }
1284
1285        fn fill_sentinel_bytes(&mut self) {
1286            // Safety: We have mut reference to Pool, so we get to modify the
1287            // VMO pointed by self.base.
1288            unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1289        }
1290    }
1291
1292    impl Buffer<Tx> {
1293        // Write a byte at offset, the result buffer should be pad_size long, with
1294        // 0..offset being the SENTINEL_BYTE, offset being the WRITE_BYTE and the
1295        // rest being PAD_BYTE.
1296        fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1297            self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1298            self.pad().expect("failed to pad");
1299            assert_eq!(self.len(), pad_size);
1300            // An arbitrary value that is not SENTINAL/WRITE/PAD_BYTE so that
1301            // we can make sure the write really happened.
1302            const INIT_BYTE: u8 = 42;
1303            let mut read_buf = vec![INIT_BYTE; pad_size];
1304            self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1305            for (idx, byte) in read_buf.iter().enumerate() {
1306                if idx < offset {
1307                    assert_eq!(*byte, SENTINEL_BYTE);
1308                } else if idx == offset {
1309                    assert_eq!(*byte, WRITE_BYTE);
1310                } else {
1311                    assert_eq!(*byte, PAD_BYTE);
1312                }
1313            }
1314        }
1315    }
1316
1317    impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1318    where
1319        K: AllocKind,
1320        I: ExactSizeIterator<Item = u16>,
1321        T: Copy + IntoIterator<IntoIter = I>,
1322    {
1323        fn eq(&self, other: &T) -> bool {
1324            let iter = other.into_iter();
1325            if usize::from(self.len) != iter.len() {
1326                return false;
1327            }
1328            self.iter().zip(iter).all(|(l, r)| l.get() == r)
1329        }
1330    }
1331
1332    impl Debug for TxAllocReq {
1333        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1334            let TxAllocReq { sender: _, size } = self;
1335            f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1336        }
1337    }
1338
1339    #[test]
1340    fn alloc_tx_distinct() {
1341        let pool = Pool::new_test_default();
1342        let allocated = pool.alloc_tx_all(1);
1343        assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1344        let distinct = allocated
1345            .iter()
1346            .map(|alloc| {
1347                assert_eq!(alloc.descs.len(), 1);
1348                alloc.descs[0].get()
1349            })
1350            .collect::<HashSet<u16>>();
1351        assert_eq!(allocated.len(), distinct.len());
1352    }
1353
1354    #[test]
1355    fn alloc_tx_free_len() {
1356        let pool = Pool::new_test_default();
1357        {
1358            let allocated = pool.alloc_tx_all(2);
1359            assert_eq!(
1360                allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1361                DEFAULT_TX_BUFFERS.get().into()
1362            );
1363            assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1364        }
1365        assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1366    }
1367
1368    #[test]
1369    fn alloc_tx_chain() {
1370        let pool = Pool::new_test_default();
1371        let allocated = pool.alloc_tx_all(3);
1372        assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1373        assert_matches!(pool.alloc_tx_now_or_never(3), None);
1374        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1375    }
1376
1377    #[test]
1378    fn alloc_tx_many() {
1379        let pool = Pool::new_test_default();
1380        let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1381            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1382            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1383        let data_len = usize::try_from(data_len).unwrap();
1384        let mut buffers = pool
1385            .alloc_tx_buffers(data_len)
1386            .now_or_never()
1387            .expect("failed to alloc")
1388            .unwrap()
1389            // Collect into a vec so we keep the buffers alive, otherwise they
1390            // are immediately returned to the pool.
1391            .collect::<Result<Vec<_>>>()
1392            .expect("buffer error");
1393        assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1394
1395        // We have all the buffers, which means allocating more should not
1396        // resolve.
1397        assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1398
1399        // If we release a single buffer we should be able to retrieve it again.
1400        assert_matches!(buffers.pop(), Some(_));
1401        let mut more_buffers =
1402            pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1403        let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1404        assert_matches!(more_buffers.next(), None);
1405        // The iterator is fused, so None is yielded even after dropping the
1406        // buffer.
1407        drop(buffer);
1408        assert_matches!(more_buffers.next(), None);
1409    }
1410
1411    #[test]
1412    fn alloc_tx_after_free() {
1413        let pool = Pool::new_test_default();
1414        let mut allocated = pool.alloc_tx_all(1);
1415        assert_matches!(pool.alloc_tx_now_or_never(2), None);
1416        {
1417            let _drained = allocated.drain(..2);
1418        }
1419        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1420    }
1421
1422    #[test]
1423    fn blocking_alloc_tx() {
1424        let mut executor = fasync::TestExecutor::new();
1425        let pool = Pool::new_test_default();
1426        let mut allocated = pool.alloc_tx_all(1);
1427        let alloc_fut = pool.alloc_tx_checked(1);
1428        let mut alloc_fut = pin!(alloc_fut);
1429        // The allocation should block.
1430        assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1431        // And the allocation request should be queued.
1432        assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1433        let freed = allocated
1434            .pop()
1435            .expect("no fulfulled allocations")
1436            .iter()
1437            .map(|x| x.get())
1438            .collect::<Chained<_>>();
1439        let same_as_freed =
1440            |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1441        // Now the task should be able to continue.
1442        assert_matches!(
1443            &executor.run_until_stalled(&mut alloc_fut),
1444            Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1445        );
1446        // And the queued request should now be removed.
1447        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1448    }
1449
1450    #[test]
1451    fn blocking_alloc_tx_cancel_before_free() {
1452        let mut executor = fasync::TestExecutor::new();
1453        let pool = Pool::new_test_default();
1454        let mut allocated = pool.alloc_tx_all(1);
1455        {
1456            let alloc_fut = pool.alloc_tx_checked(1);
1457            let mut alloc_fut = pin!(alloc_fut);
1458            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1459            assert_matches!(
1460                pool.tx_alloc_state.lock().requests.as_slices(),
1461                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1462            );
1463        }
1464        assert_matches!(
1465            allocated.pop(),
1466            Some(AllocGuard { ref descs, pool: ref p })
1467                if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1468        );
1469        let state = pool.tx_alloc_state.lock();
1470        assert_eq!(state.free_list.len, 1);
1471        assert!(state.requests.is_empty());
1472    }
1473
1474    #[test]
1475    fn blocking_alloc_tx_cancel_after_free() {
1476        let mut executor = fasync::TestExecutor::new();
1477        let pool = Pool::new_test_default();
1478        let mut allocated = pool.alloc_tx_all(1);
1479        {
1480            let alloc_fut = pool.alloc_tx_checked(1);
1481            let mut alloc_fut = pin!(alloc_fut);
1482            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1483            assert_matches!(
1484                pool.tx_alloc_state.lock().requests.as_slices(),
1485                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1486            );
1487            assert_matches!(
1488                allocated.pop(),
1489                Some(AllocGuard { ref descs, pool: ref p })
1490                    if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1491            );
1492        }
1493        let state = pool.tx_alloc_state.lock();
1494        assert_eq!(state.free_list.len, 1);
1495        assert!(state.requests.is_empty());
1496    }
1497
1498    #[test]
1499    fn multiple_blocking_alloc_tx_fulfill_order() {
1500        const TASKS_TOTAL: usize = 3;
1501        let mut executor = fasync::TestExecutor::new();
1502        let pool = Pool::new_test_default();
1503        let mut allocated = pool.alloc_tx_all(1);
1504        let mut alloc_futs = (1..=TASKS_TOTAL)
1505            .rev()
1506            .map(|x| {
1507                let pool = pool.clone();
1508                (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1509            })
1510            .collect::<Vec<_>>();
1511
1512        for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1513            assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1514            // assert that the tasks are sorted decreasing on the requested size.
1515            assert_eq!(idx + *req_size, TASKS_TOTAL);
1516        }
1517        {
1518            let state = pool.tx_alloc_state.lock();
1519            // The first pending request was introduced by `alloc_tx_all`.
1520            assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1521            let mut requests = state.requests.iter();
1522            // It should already be cancelled because the requesting future is
1523            // already dropped.
1524            assert!(requests.next().unwrap().sender.is_canceled());
1525            // The rest of the requests must not be cancelled.
1526            assert!(requests.all(|req| !req.sender.is_canceled()))
1527        }
1528
1529        let mut to_free = Vec::new();
1530        let mut freed = 0;
1531        for free_size in (1..=TASKS_TOTAL).rev() {
1532            let (_req_size, mut task) = alloc_futs.remove(0);
1533            for _ in 1..free_size {
1534                freed += 1;
1535                assert_matches!(
1536                    allocated.pop(),
1537                    Some(AllocGuard { ref descs, pool: ref p })
1538                        if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1539                );
1540                assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1541            }
1542            freed += 1;
1543            assert_matches!(
1544                allocated.pop(),
1545                Some(AllocGuard { ref descs, pool: ref p })
1546                    if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1547            );
1548            match executor.run_until_stalled(&mut task) {
1549                Poll::Ready(alloc) => {
1550                    assert_eq!(alloc.len(), free_size);
1551                    // Don't return the allocation to the pool now.
1552                    to_free.push(alloc);
1553                }
1554                Poll::Pending => panic!("The request should be fulfilled"),
1555            }
1556            // The rest of requests can not be fulfilled.
1557            for (_req_size, task) in alloc_futs.iter_mut() {
1558                assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1559            }
1560        }
1561        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1562    }
1563
1564    #[test]
1565    fn singleton_tx_layout() {
1566        let pool = Pool::new_test_default();
1567        let buffers = std::iter::from_fn(|| {
1568            let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1569                - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1570                - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1571            pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1572                assert_eq!(buffer.alloc.descriptors().count(), 1);
1573                let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1574                    * u64::from(buffer.alloc[0].get());
1575                {
1576                    let descriptor = buffer.alloc.descriptor();
1577                    assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1578                    assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1579                    assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1580                    assert_eq!(descriptor.data_length(), data_len);
1581                    assert_eq!(descriptor.offset(), offset);
1582                }
1583
1584                assert_eq!(buffer.parts.len(), 1);
1585                let BufferPart { ptr, len, cap } = buffer.parts[0];
1586                assert_eq!(len, 0);
1587                assert_eq!(
1588                    // Using wrapping_add because we will never dereference the
1589                    // resulting pointer and it saves us an unsafe block.
1590                    pool.base.as_ptr().wrapping_add(
1591                        usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1592                    ),
1593                    ptr
1594                );
1595                assert_eq!(data_len, u32::try_from(cap).unwrap());
1596                buffer
1597            })
1598        })
1599        .collect::<Vec<_>>();
1600        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1601    }
1602
1603    #[test]
1604    fn chained_tx_layout() {
1605        let pool = Pool::new_test_default();
1606        let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1607            - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1608            - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1609        let buffers = std::iter::from_fn(|| {
1610            pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1611                assert_eq!(buffer.parts.len(), 4);
1612                for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1613                    let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1614                    let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1615                    let tail_length = if chain_length == ChainLength::ZERO {
1616                        DEFAULT_MIN_TX_BUFFER_TAIL
1617                    } else {
1618                        0
1619                    };
1620                    let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1621                        - u32::from(head_length)
1622                        - u32::from(tail_length);
1623                    let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1624                        * u64::from(buffer.alloc[idx].get());
1625                    assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1626                    assert_eq!(descriptor.head_length(), head_length);
1627                    assert_eq!(descriptor.tail_length(), tail_length);
1628                    assert_eq!(descriptor.offset(), offset);
1629                    assert_eq!(descriptor.data_length(), data_len);
1630                    if chain_length != ChainLength::ZERO {
1631                        assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1632                    }
1633
1634                    let BufferPart { ptr, cap, len } = buffer.parts[idx];
1635                    assert_eq!(len, 0);
1636                    assert_eq!(
1637                        // Using wrapping_add because we will never dereference
1638                        // the resulting ptr and it saves us an unsafe block.
1639                        pool.base.as_ptr().wrapping_add(
1640                            usize::try_from(offset).unwrap() + usize::from(head_length),
1641                        ),
1642                        ptr
1643                    );
1644                    assert_eq!(data_len, u32::try_from(cap).unwrap());
1645                }
1646                buffer
1647            })
1648        })
1649        .collect::<Vec<_>>();
1650        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1651    }
1652
1653    #[test]
1654    fn rx_distinct() {
1655        let pool = Pool::new_test_default();
1656        let mut guard = pool.rx_pending.inner.lock();
1657        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1658        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1659        let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1660        assert_eq!(descs.len(), distinct.len());
1661    }
1662
1663    #[test]
1664    fn alloc_rx_layout() {
1665        let pool = Pool::new_test_default();
1666        let mut guard = pool.rx_pending.inner.lock();
1667        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1668        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1669        for desc in descs.iter() {
1670            let descriptor = pool.descriptors.borrow(desc);
1671            let offset =
1672                u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1673            assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1674            assert_eq!(descriptor.head_length(), 0);
1675            assert_eq!(descriptor.tail_length(), 0);
1676            assert_eq!(descriptor.offset(), offset);
1677            assert_eq!(
1678                descriptor.data_length(),
1679                u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1680            );
1681        }
1682    }
1683
1684    #[test]
1685    fn buffer_read_at_write_at() {
1686        let pool = Pool::new_test_default();
1687        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1688        let mut buffer =
1689            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1690        // Because we have to accommodate the space for head and tail, there
1691        // would be 2 parts instead of 1.
1692        assert_eq!(buffer.parts.len(), 2);
1693        assert_eq!(buffer.cap(), alloc_bytes);
1694        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1695        buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1696        let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1697        buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1698        for (idx, byte) in read_buf.iter().enumerate() {
1699            assert_eq!(*byte, write_buf[idx]);
1700        }
1701    }
1702
1703    #[test]
1704    fn buffer_read_write_seek() {
1705        let pool = Pool::new_test_default();
1706        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1707        let mut buffer =
1708            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1709        // Because we have to accommodate the space for head and tail, there
1710        // would be 2 parts instead of 1.
1711        assert_eq!(buffer.parts.len(), 2);
1712        assert_eq!(buffer.cap(), alloc_bytes);
1713        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1714        assert_eq!(
1715            buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1716            write_buf.len()
1717        );
1718        const SEEK_FROM_END: usize = 64;
1719        const READ_LEN: usize = 12;
1720        assert_eq!(
1721            buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1722            u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1723        );
1724        let mut read_buf = [0xff; READ_LEN];
1725        assert_eq!(
1726            buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1727            read_buf.len()
1728        );
1729        assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1730    }
1731
1732    #[test_case(32; "single buffer part")]
1733    #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1734    fn buffer_pad(pad_size: usize) {
1735        let mut pool = Pool::new_test_default();
1736        pool.set_min_tx_buffer_length(pad_size);
1737        for offset in 0..pad_size {
1738            Arc::get_mut(&mut pool)
1739                .expect("there are multiple owners of the underlying VMO")
1740                .fill_sentinel_bytes();
1741            let mut buffer =
1742                pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1743            buffer.check_write_and_pad(offset, pad_size);
1744        }
1745    }
1746
1747    #[test]
1748    fn buffer_pad_grow() {
1749        const BUFFER_PARTS: u8 = 3;
1750        let mut pool = Pool::new_test_default();
1751        let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1752            * u32::from(BUFFER_PARTS)
1753            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1754            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1755        pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1756        for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1757            Arc::get_mut(&mut pool)
1758                .expect("there are multiple owners of the underlying VMO")
1759                .fill_sentinel_bytes();
1760            let mut alloc =
1761                pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1762            alloc
1763                .init(
1764                    DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1765                        - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1766                        - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1767                )
1768                .expect("head/body/tail sizes are representable with u16/u32/u16");
1769            let mut buffer = Buffer::try_from(alloc).unwrap();
1770            buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1771        }
1772    }
1773
1774    #[test_case(  0; "writes at the beginning")]
1775    #[test_case( 15; "writes in the first part")]
1776    #[test_case( 75; "writes in the second part")]
1777    #[test_case(135; "writes in the third part")]
1778    #[test_case(195; "writes in the last part")]
1779    fn buffer_used(write_offset: usize) {
1780        let pool = Pool::new_test_default();
1781        let mut buffer =
1782            pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1783        let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1784            if i == 0 {
1785                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1786            } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1787                DEFAULT_BUFFER_LENGTH.get()
1788            } else {
1789                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1790            }
1791        });
1792        assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1793        buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1794        // The accumulator is Some if we haven't found the part where the byte
1795        // was written, None if we've already found it.
1796        assert_eq!(
1797            buffer.parts.iter().zip(expected_caps).fold(
1798                Some(write_offset),
1799                |offset, (part, expected_cap)| {
1800                    // The cap must match the expectation.
1801                    assert_eq!(part.cap, expected_cap);
1802
1803                    match offset {
1804                        Some(offset) => {
1805                            if offset >= expected_cap {
1806                                // The part should have used all the capacity.
1807                                assert_eq!(part.len, part.cap);
1808                                Some(offset - part.len)
1809                            } else {
1810                                // The part should end right after our byte.
1811                                assert_eq!(part.len, offset + 1);
1812                                let mut buf = [0];
1813                                // Verify that the byte is indeed written.
1814                                assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1815                                assert_eq!(buf[0], WRITE_BYTE);
1816                                None
1817                            }
1818                        }
1819                        None => {
1820                            // We should have never written in this part.
1821                            assert_eq!(part.len, 0);
1822                            None
1823                        }
1824                    }
1825                }
1826            ),
1827            None
1828        )
1829    }
1830
1831    #[test]
1832    fn buffer_commit() {
1833        let pool = Pool::new_test_default();
1834        for offset in 0..MAX_BUFFER_BYTES {
1835            let mut buffer = pool
1836                .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1837                .expect("failed to allocate buffer");
1838            buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1839            buffer.commit();
1840            for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1841                let head_length = descriptor.head_length();
1842                let tail_length = descriptor.tail_length();
1843                let data_length = descriptor.data_length();
1844                assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1845                assert_eq!(
1846                    u32::from(head_length + tail_length) + data_length,
1847                    u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1848                );
1849            }
1850        }
1851    }
1852
1853    #[test]
1854    fn allocate_under_device_minimum() {
1855        const MIN_TX_DATA: usize = 32;
1856        const ALLOC_SIZE: usize = 16;
1857        const WRITE_BYTE: u8 = 0xff;
1858        const WRITE_SENTINAL_BYTE: u8 = 0xee;
1859        const READ_SENTINAL_BYTE: u8 = 0xdd;
1860        let mut config = DEFAULT_CONFIG;
1861        config.buffer_layout.min_tx_data = MIN_TX_DATA;
1862        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1863        for mut buffer in Vec::from_iter(std::iter::from_fn({
1864            let pool = pool.clone();
1865            move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1866        })) {
1867            buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1868        }
1869        let mut allocated =
1870            pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1871        assert_eq!(allocated.cap(), ALLOC_SIZE);
1872        const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1873        assert_matches!(
1874            allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1875            Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1876        );
1877        allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1878        assert_matches!(allocated.pad(), Ok(()));
1879        assert_eq!(allocated.cap(), MIN_TX_DATA);
1880        assert_eq!(allocated.len(), MIN_TX_DATA);
1881        const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1882        let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1883        assert_matches!(
1884            allocated.read_at(0, &mut read_buf[..]),
1885            Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1886        );
1887        allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1888        assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1889        assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1890        assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1891    }
1892
1893    #[test]
1894    fn invalid_tx_length() {
1895        let mut config = DEFAULT_CONFIG;
1896        config.buffer_layout.length = usize::from(u16::MAX) + 2;
1897        config.buffer_layout.min_tx_head = 0;
1898        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1899        assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1900    }
1901
1902    #[test]
1903    fn rx_leases() {
1904        let mut executor = fuchsia_async::TestExecutor::new();
1905        let state = RxLeaseHandlingState::new_with_enabled(true);
1906        let mut watcher = RxLeaseWatcher { state: &state };
1907
1908        {
1909            let mut fut = pin!(watcher.wait_until(0));
1910            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1911        }
1912        {
1913            state.rx_complete();
1914            let mut fut = pin!(watcher.wait_until(1));
1915            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1916        }
1917        {
1918            let mut fut = pin!(watcher.wait_until(0));
1919            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1920        }
1921        {
1922            let mut fut = pin!(watcher.wait_until(3));
1923            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1924            state.rx_complete();
1925            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1926            state.rx_complete();
1927            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1928        }
1929        // Dropping the wait future without seeing it complete restores the
1930        // value.
1931        let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1932        {
1933            let mut fut = pin!(watcher.wait_until(10000));
1934            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1935        }
1936        let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1937        assert_eq!(counter_before, counter_after);
1938    }
1939}