netdevice_client/session/buffer/
pool.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice buffer pool.
6
7use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32/// Responsible for managing [`Buffer`]s for a [`Session`](crate::session::Session).
33pub(in crate::session) struct Pool {
34    /// Base address of the pool.
35    // Note: This field requires us to manually implement `Sync` and `Send`.
36    base: NonNull<u8>,
37    /// The length of the pool in bytes.
38    bytes: usize,
39    /// The descriptors allocated for the pool.
40    descriptors: Descriptors,
41    /// Shared state for allocation.
42    tx_alloc_state: Mutex<TxAllocState>,
43    /// The free rx descriptors pending to be sent to driver.
44    pub(in crate::session) rx_pending: Pending<Rx>,
45    /// The buffer layout.
46    buffer_layout: BufferLayout,
47    /// State-keeping allowing sessions to handle rx leases.
48    rx_leases: RxLeaseHandlingState,
49}
50
51// `Pool` is `Send` and `Sync`, and this allows the compiler to deduce `Buffer`
52// to be `Send`. These impls are safe because we can safely share `Pool` and
53// `&Pool`: the implementation would never allocate the same buffer to two
54// callers at the same time.
55unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58/// The shared state which keeps track of available buffers and tx buffers.
59struct TxAllocState {
60    /// All pending tx allocation requests.
61    requests: VecDeque<TxAllocReq>,
62    free_list: TxFreeList,
63}
64
65/// We use a linked list to maintain the tx free descriptors - they are linked
66/// through their `nxt` fields, note this differs from the chaining expected
67/// by the network device protocol:
68/// - You can chain more than [`netdev::MAX_DESCRIPTOR_CHAIN`] descriptors
69///   together.
70/// - the free-list ends when the `nxt` field is 0xff, while the normal chain
71///   ends when `chain_length` becomes 0.
72struct TxFreeList {
73    /// The head of a linked list of available descriptors that can be allocated
74    /// for tx.
75    head: Option<DescId<Tx>>,
76    /// How many free descriptors are there in the pool.
77    len: u16,
78}
79
80impl Pool {
81    /// Creates a new [`Pool`] and its backing [`zx::Vmo`]s.
82    ///
83    /// Returns [`Pool`] and the [`zx::Vmo`]s for descriptors and data, in that
84    /// order.
85    pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86        let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87            config;
88        let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89        let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90            Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92        // Construct the free list.
93        let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94            descriptors.borrow_mut(&mut curr).set_nxt(head);
95            Some(curr)
96        });
97
98        for rx_desc in rx_free.iter_mut() {
99            descriptors.borrow_mut(rx_desc).initialize(
100                ChainLength::ZERO,
101                0,
102                buffer_layout.length.try_into().unwrap(),
103                0,
104            );
105        }
106
107        let tx_alloc_state = TxAllocState {
108            free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109            requests: VecDeque::new(),
110        };
111
112        let size = buffer_stride.get() * u64::from(num_buffers);
113        let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115        const VMO_NAME: zx::Name =
116            const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117        data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118        // `as` is OK because `size` is positive and smaller than isize::MAX.
119        // This is following the practice of rust stdlib to ensure allocation
120        // size never reaches isize::MAX.
121        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
122        let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123        // The returned address of zx_vmar_map on success must be non-zero:
124        // https://fuchsia.dev/fuchsia-src/reference/syscalls/vmar_map
125        let base = NonNull::new(
126            vmar_root_self()
127                .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128                .map_err(|status| Error::Map("data", status))? as *mut u8,
129        )
130        .unwrap();
131
132        Ok((
133            Arc::new(Pool {
134                base,
135                bytes: len,
136                descriptors,
137                tx_alloc_state: Mutex::new(tx_alloc_state),
138                rx_pending: Pending::new(rx_free),
139                buffer_layout,
140                rx_leases: RxLeaseHandlingState::new_with_flags(options),
141            }),
142            descriptors_vmo,
143            data_vmo,
144        ))
145    }
146
147    /// Allocates `num_parts` tx descriptors.
148    ///
149    /// It will block if there are not enough descriptors. Note that the
150    /// descriptors are not initialized, you need to call [`AllocGuard::init()`]
151    /// on the returned [`AllocGuard`] if you want to send it to the driver
152    /// later. See [`AllocGuard<Rx>::into_tx()`] for an example where
153    /// [`AllocGuard::init()`] is not needed because the tx allocation will be
154    /// returned to the pool immediately and won't be sent to the driver.
155    pub(in crate::session) async fn alloc_tx(
156        self: &Arc<Self>,
157        num_parts: ChainLength,
158    ) -> AllocGuard<Tx> {
159        let receiver = {
160            let mut state = self.tx_alloc_state.lock();
161            match state.free_list.try_alloc(num_parts, &self.descriptors) {
162                Some(allocated) => {
163                    return AllocGuard::new(allocated, self.clone());
164                }
165                None => {
166                    let (request, receiver) = TxAllocReq::new(num_parts);
167                    state.requests.push_back(request);
168                    receiver
169                }
170            }
171        };
172        // The sender must not be dropped.
173        receiver.await.unwrap()
174    }
175
176    /// Allocates a tx [`Buffer`].
177    ///
178    /// The returned buffer will have `num_bytes` as its capacity, the method
179    /// will block if there are not enough buffers. An error will be returned if
180    /// the requested size cannot meet the device requirement, for example, if
181    /// the size of the head or tail region will become unrepresentable in u16.
182    pub(in crate::session) async fn alloc_tx_buffer(
183        self: &Arc<Self>,
184        num_bytes: usize,
185    ) -> Result<Buffer<Tx>> {
186        self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
187    }
188
189    /// Waits for at least one TX buffer to be available and returns an iterator
190    /// of buffers with `num_bytes` as capacity.
191    ///
192    /// The returned iterator is guaranteed to yield at least one item (though
193    /// it might be an error if the requested size cannot meet the device
194    /// requirement).
195    ///
196    /// # Note
197    ///
198    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
199    /// returned iterator will seemingly yield infinite items if the yielded
200    /// `Buffer`s are dropped while iterating.
201    pub(in crate::session) async fn alloc_tx_buffers<'a>(
202        self: &'a Arc<Self>,
203        num_bytes: usize,
204    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
205        let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
206            self.buffer_layout;
207        let tx_head = usize::from(min_tx_head);
208        let tx_tail = usize::from(min_tx_tail);
209        let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
210        let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
211        let chain_length = ChainLength::try_from(num_parts)?;
212        let first = self.alloc_tx(chain_length).await;
213        let iter = std::iter::once(first)
214            .chain(std::iter::from_fn(move || {
215                let mut state = self.tx_alloc_state.lock();
216                state
217                    .free_list
218                    .try_alloc(chain_length, &self.descriptors)
219                    .map(|allocated| AllocGuard::new(allocated, self.clone()))
220            }))
221            // Fuse afterwards so we're guaranteeing we can't see a new entry
222            // after having yielded `None` once.
223            .fuse()
224            .map(move |mut alloc| {
225                alloc.init(num_bytes)?;
226                Ok(alloc.into())
227            });
228        Ok(iter)
229    }
230
231    /// Frees rx descriptors.
232    pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
233        self.rx_pending.extend(descs.into_iter().map(|mut desc| {
234            self.descriptors.borrow_mut(&mut desc).initialize(
235                ChainLength::ZERO,
236                0,
237                self.buffer_layout.length.try_into().unwrap(),
238                0,
239            );
240            desc
241        }));
242    }
243
244    /// Frees tx descriptors.
245    ///
246    /// # Panics
247    ///
248    /// Panics if given an empty chain.
249    fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
250        // We store any pending request that need to be fulfilled in the stack
251        // here, to fulfill them only once we drop the lock, guaranteeing an
252        // AllocGuard can't be dropped while the lock is held.
253        let mut to_fulfill = ArrayVec::<
254            (TxAllocReq, AllocGuard<Tx>),
255            { netdev::MAX_DESCRIPTOR_CHAIN as usize },
256        >::new();
257
258        let mut state = self.tx_alloc_state.lock();
259
260        {
261            let mut descs = chain.into_iter();
262            // The following can't overflow because we can have at most u16::MAX
263            // descriptors: free_len + #(to_free) + #(descs in use) <= u16::MAX,
264            // Thus free_len + #(to_free) <= u16::MAX.
265            state.free_list.len += u16::try_from(descs.len()).unwrap();
266            let head = descs.next();
267            let old_head = std::mem::replace(&mut state.free_list.head, head);
268            let mut tail = descs.last();
269            let mut tail_ref = self.descriptors.borrow_mut(
270                tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
271            );
272            tail_ref.set_nxt(old_head);
273        }
274
275        // After putting the chain back into the free list, we try to fulfill
276        // any pending tx allocation requests.
277        while let Some(req) = state.requests.front() {
278            // Skip a request that we know is canceled.
279            //
280            // This is an optimization for long-ago dropped requests, since the
281            // receiver side can be dropped between here and fulfillment later.
282            if req.sender.is_canceled() {
283                let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
284                continue;
285            }
286            let size = req.size;
287            match state.free_list.try_alloc(size, &self.descriptors) {
288                Some(descs) => {
289                    // The unwrap is safe because we know requests is not empty.
290                    let req = state.requests.pop_front().unwrap();
291                    to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
292
293                    // If we're full temporarily release the lock to go again
294                    // later. Fulfilling a request must _always_ be done without
295                    // holding the lock.
296                    if to_fulfill.is_full() {
297                        drop(state);
298                        for (req, alloc) in to_fulfill.drain(..) {
299                            req.fulfill(alloc)
300                        }
301                        state = self.tx_alloc_state.lock();
302                    }
303                }
304                None => break,
305            }
306        }
307
308        // Make sure we're not holding the state lock when fulfilling requests.
309        drop(state);
310        // Fulfill any ready requests.
311        for (req, alloc) in to_fulfill {
312            req.fulfill(alloc)
313        }
314    }
315
316    /// Frees the completed tx descriptors chained by head to the pool.
317    ///
318    /// Call this function when the driver hands back a completed tx descriptor.
319    pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
320        let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
321        Ok(self.free_tx(chain))
322    }
323
324    /// Creates a [`Buffer<Rx>`] corresponding to the completed rx descriptors.
325    ///
326    /// Whenever the driver hands back a completed rx descriptor, this function
327    /// can be used to create the buffer that is represented by those chained
328    /// descriptors.
329    pub(in crate::session) fn rx_completed(
330        self: &Arc<Self>,
331        head: DescId<Rx>,
332    ) -> Result<Buffer<Rx>> {
333        let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
334        let alloc = AllocGuard::new(descs, self.clone());
335        Ok(alloc.into())
336    }
337}
338
339impl Drop for Pool {
340    fn drop(&mut self) {
341        unsafe {
342            vmar_root_self()
343                .unmap(self.base.as_ptr() as usize, self.bytes)
344                .expect("failed to unmap VMO for Pool")
345        }
346    }
347}
348
349impl TxFreeList {
350    /// Tries to allocate tx descriptors.
351    ///
352    /// Returns [`None`] if there are not enough descriptors.
353    fn try_alloc(
354        &mut self,
355        num_parts: ChainLength,
356        descriptors: &Descriptors,
357    ) -> Option<Chained<DescId<Tx>>> {
358        if u16::from(num_parts.get()) > self.len {
359            return None;
360        }
361
362        let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
363            let new_head = self.head.as_ref().and_then(|head| {
364                let nxt = descriptors.borrow(head).nxt();
365                nxt.map(|id| unsafe {
366                    // Safety: This is the nxt field of head of the free list,
367                    // it must be a tx descriptor id.
368                    DescId::from_raw(id)
369                })
370            });
371            std::mem::replace(&mut self.head, new_head)
372        });
373        let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
374        assert_eq!(allocated.len(), num_parts.into());
375        self.len -= u16::from(num_parts.get());
376        Some(allocated)
377    }
378}
379
380/// The buffer that can be used by the [`Session`](crate::session::Session).
381///
382/// All [`Buffer`]s implement [`std::io::Read`] and [`Buffer<Tx>`]s implement
383/// [`std::io::Write`].
384pub struct Buffer<K: AllocKind> {
385    /// The descriptors allocation.
386    alloc: AllocGuard<K>,
387    /// Underlying memory regions.
388    parts: Chained<BufferPart>,
389    /// The current absolute position to read/write within the [`Buffer`].
390    pos: usize,
391}
392
393impl<K: AllocKind> Debug for Buffer<K> {
394    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
395        let Self { alloc, parts, pos } = self;
396        f.debug_struct("Buffer")
397            .field("cap", &self.cap())
398            .field("alloc", alloc)
399            .field("parts", parts)
400            .field("pos", pos)
401            .finish()
402    }
403}
404
405impl<K: AllocKind> Buffer<K> {
406    /// Gets the capacity of the buffer in bytes as requested for allocation.
407    pub fn cap(&self) -> usize {
408        self.parts.iter().fold(0, |acc, part| acc + part.cap)
409    }
410
411    /// Gets the length of the buffer which is actually used.
412    pub fn len(&self) -> usize {
413        self.parts.iter().fold(0, |acc, part| acc + part.len)
414    }
415
416    /// Writes bytes to the buffer.
417    ///
418    /// Writes up to `src.len()` bytes into the buffer beginning at `offset`,
419    /// returning how many bytes were written successfully. Partial write is
420    /// not considered as an error.
421    pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
422        if self.cap() < offset + src.len() {
423            return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
424        }
425        let mut part_start = 0;
426        let mut total = 0;
427        for part in self.parts.iter_mut() {
428            if offset + total < part_start + part.cap {
429                let written = part.write_at(offset + total - part_start, &src[total..])?;
430                total += written;
431                if total == src.len() {
432                    break;
433                }
434            } else {
435                part.len = part.cap;
436            }
437            part_start += part.cap;
438        }
439        assert_eq!(total, src.len());
440        Ok(())
441    }
442
443    /// Reads bytes from the buffer.
444    ///
445    /// Reads up to `dst.len()` bytes from the buffer beginning at `offset`,
446    /// returning how many bytes were read successfully. Partial read is
447    /// considered as an error.
448    pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
449        if self.len() < offset + dst.len() {
450            return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
451        }
452        let mut part_start = 0;
453        let mut total = 0;
454        for part in self.parts.iter() {
455            if offset + total < part_start + part.cap {
456                let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
457                total += read;
458                if total == dst.len() {
459                    break;
460                }
461            }
462            part_start += part.cap;
463        }
464        assert_eq!(total, dst.len());
465        Ok(())
466    }
467
468    /// Returns this buffer as a mutable slice if it's not fragmented.
469    pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
470        match &mut (*self.parts)[..] {
471            [] => Some(&mut []),
472            [one] => Some(one.as_slice_mut()),
473            _ => None,
474        }
475    }
476
477    /// Pads the [`Buffer`] to minimum tx buffer length requirements.
478    pub(in crate::session) fn pad(&mut self) -> Result<()> {
479        let num_parts = self.parts.len();
480        let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
481            self.alloc.pool.buffer_layout;
482        let mut target = min_tx_data;
483        for (i, part) in self.parts.iter_mut().enumerate() {
484            let grow_cap = if i == num_parts - 1 {
485                let descriptor =
486                    self.alloc.descriptors().last().expect("descriptor must not be empty");
487                let data_length = descriptor.data_length();
488                let tail_length = descriptor.tail_length();
489                // data_length + tail_length <= buffer_length <= usize::MAX.
490                let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
491                match rest.checked_sub(usize::from(min_tx_tail)) {
492                    Some(grow_cap) => Some(grow_cap),
493                    None => break,
494                }
495            } else {
496                None
497            };
498            target -= part.pad(target, grow_cap)?;
499        }
500        if target != 0 {
501            return Err(Error::Pad(min_tx_data, self.cap()));
502        }
503        Ok(())
504    }
505
506    /// Leaks the underlying buffer descriptors to the driver.
507    ///
508    /// Returns the head of the leaked allocation.
509    pub(in crate::session) fn leak(mut self) -> DescId<K> {
510        let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
511        descs.into_iter().next().unwrap()
512    }
513
514    /// Retrieves the frame type of the buffer.
515    pub fn frame_type(&self) -> Result<netdev::FrameType> {
516        self.alloc.descriptor().frame_type()
517    }
518
519    /// Retrieves the buffer's source port.
520    pub fn port(&self) -> Port {
521        self.alloc.descriptor().port()
522    }
523}
524
525impl Buffer<Tx> {
526    /// Commits the metadata for the buffer to descriptors.
527    pub(in crate::session) fn commit(&mut self) {
528        for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
529            // The following unwrap is safe because part.len must be smaller than
530            // buffer_length, which is a u32.
531            descriptor.commit(u32::try_from(part.len).unwrap())
532        }
533    }
534
535    /// Sets the buffer's destination port.
536    pub fn set_port(&mut self, port: Port) {
537        self.alloc.descriptor_mut().set_port(port)
538    }
539
540    /// Sets the frame type of the buffer.
541    pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
542        self.alloc.descriptor_mut().set_frame_type(frame_type)
543    }
544
545    /// Sets TxFlags of a Tx buffer.
546    pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
547        self.alloc.descriptor_mut().set_tx_flags(flags)
548    }
549}
550
551impl Buffer<Rx> {
552    /// Turns an rx buffer into a tx one.
553    pub async fn into_tx(self) -> Buffer<Tx> {
554        let Buffer { alloc, parts, pos } = self;
555        Buffer { alloc: alloc.into_tx().await, parts, pos }
556    }
557
558    /// Retrieves RxFlags of an Rx Buffer.
559    pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
560        self.alloc.descriptor().rx_flags()
561    }
562}
563
564impl AllocGuard<Rx> {
565    /// Turns a tx allocation into an rx one.
566    ///
567    /// To achieve this we have to convert the same amount of descriptors from
568    /// the tx pool to the rx pool to compensate for us being converted to tx
569    /// descriptors from rx ones.
570    async fn into_tx(mut self) -> AllocGuard<Tx> {
571        let mut tx = self.pool.alloc_tx(self.descs.len).await;
572        // [MaybeUninit<DescId<Tx>; 4] and [MaybeUninit<DescId<Rx>; 4] have the
573        // same memory layout because DescId is repr(transparent). So it is safe
574        // to transmute and swap the values between the storages. After the swap
575        // the drop implementation of self will return the descriptors back to
576        // rx pool.
577        std::mem::swap(&mut self.descs.storage, unsafe {
578            std::mem::transmute(&mut tx.descs.storage)
579        });
580        tx
581    }
582}
583
584/// A non-empty container that has at most [`netdev::MAX_DESCRIPTOR_CHAIN`] elements.
585struct Chained<T> {
586    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
587    len: ChainLength,
588}
589
590impl<T> Deref for Chained<T> {
591    type Target = [T];
592
593    fn deref(&self) -> &Self::Target {
594        // Safety: `self.storage[..self.len]` is already initialized.
595        unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
596    }
597}
598
599impl<T> DerefMut for Chained<T> {
600    fn deref_mut(&mut self) -> &mut Self::Target {
601        // Safety: `self.storage[..self.len]` is already initialized.
602        unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
603    }
604}
605
606impl<T> Drop for Chained<T> {
607    fn drop(&mut self) {
608        // Safety: `self.deref_mut()` is a slice of all initialized elements.
609        unsafe {
610            std::ptr::drop_in_place(self.deref_mut());
611        }
612    }
613}
614
615impl<T: Debug> Debug for Chained<T> {
616    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
617        f.debug_list().entries(self.iter()).finish()
618    }
619}
620
621impl<T> Chained<T> {
622    #[allow(clippy::uninit_assumed_init)]
623    fn empty() -> Self {
624        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
625        // safe because the type we are claiming to have initialized here is a
626        // bunch of `MaybeUninit`s, which do not require initialization.
627        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
628        // is stablized.
629        // https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.uninit_array
630        Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
631    }
632}
633
634impl<T> FromIterator<T> for Chained<T> {
635    /// # Panics
636    ///
637    /// if the iterator is empty or the iterator can yield more than
638    ///  MAX_DESCRIPTOR_CHAIN elements.
639    fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
640        let mut result = Self::empty();
641        let mut len = 0u8;
642        for (idx, e) in elements.into_iter().enumerate() {
643            result.storage[idx] = MaybeUninit::new(e);
644            len += 1;
645        }
646        assert!(len > 0);
647        // `len` can not be larger than `MAX_DESCRIPTOR_CHAIN`, otherwise we can't
648        // get here due to the bound checks on `result.storage`.
649        result.len = ChainLength::try_from(len).unwrap();
650        result
651    }
652}
653
654impl<T> IntoIterator for Chained<T> {
655    type Item = T;
656    type IntoIter = ChainedIter<T>;
657
658    fn into_iter(mut self) -> Self::IntoIter {
659        let len = self.len;
660        self.len = ChainLength::ZERO;
661        // Safety: we have reset the length to zero, it is now safe to move out
662        // the values and set them to be uninitialized. The `assume_init` is
663        // safe because the type we are claiming to have initialized here is a
664        // bunch of `MaybeUninit`s, which do not require initialization.
665        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
666        // is stablized.
667        #[allow(clippy::uninit_assumed_init)]
668        let storage =
669            std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
670        ChainedIter { storage, len, consumed: 0 }
671    }
672}
673
674struct ChainedIter<T> {
675    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
676    len: ChainLength,
677    consumed: u8,
678}
679
680impl<T> Iterator for ChainedIter<T> {
681    type Item = T;
682
683    fn next(&mut self) -> Option<Self::Item> {
684        if self.consumed < self.len.get() {
685            // Safety: it is safe now to replace that slot with an uninitialized
686            // value because we will advance consumed by 1.
687            let value = unsafe {
688                std::mem::replace(
689                    &mut self.storage[usize::from(self.consumed)],
690                    MaybeUninit::uninit(),
691                )
692                .assume_init()
693            };
694            self.consumed += 1;
695            Some(value)
696        } else {
697            None
698        }
699    }
700
701    fn size_hint(&self) -> (usize, Option<usize>) {
702        let len = usize::from(self.len.get() - self.consumed);
703        (len, Some(len))
704    }
705}
706
707impl<T> ExactSizeIterator for ChainedIter<T> {}
708
709impl<T> Drop for ChainedIter<T> {
710    fn drop(&mut self) {
711        // Safety: `self.storage[self.consumed..self.len]` is initialized.
712        unsafe {
713            std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
714                &mut self.storage[self.consumed.into()..self.len.into()],
715            ));
716        }
717    }
718}
719
720/// Guards the allocated descriptors; they will be freed when dropped.
721pub(in crate::session) struct AllocGuard<K: AllocKind> {
722    descs: Chained<DescId<K>>,
723    pool: Arc<Pool>,
724}
725
726impl<K: AllocKind> Debug for AllocGuard<K> {
727    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
728        let Self { descs, pool: _ } = self;
729        f.debug_struct("AllocGuard").field("descs", descs).finish()
730    }
731}
732
733impl<K: AllocKind> AllocGuard<K> {
734    fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
735        Self { descs, pool }
736    }
737
738    /// Iterates over references to the descriptors.
739    fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
740        self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
741    }
742
743    /// Iterates over mutable references to the descriptors.
744    fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
745        let descriptors = &self.pool.descriptors;
746        self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
747    }
748
749    /// Gets a reference to the head descriptor.
750    fn descriptor(&self) -> DescRef<'_, K> {
751        self.descriptors().next().expect("descriptors must not be empty")
752    }
753
754    /// Gets a mutable reference to the head descriptor.
755    fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
756        self.descriptors_mut().next().expect("descriptors must not be empty")
757    }
758}
759
760impl AllocGuard<Tx> {
761    /// Initializes descriptors of a tx allocation.
762    fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
763        let len = self.len();
764        let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
765            self.pool.buffer_layout;
766        for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
767            let chain_length = ChainLength::try_from(clen).unwrap();
768            let head_length = if clen + 1 == len { min_tx_head } else { 0 };
769            let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
770
771            // head_length and tail_length. The check was done when the config
772            // for pool was created, so the subtraction won't overflow.
773            let available_bytes =
774                u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
775                    .unwrap();
776
777            let data_length = match u32::try_from(requested_bytes) {
778                Ok(requested) => {
779                    if requested < available_bytes {
780                        // The requested bytes are less than what is available,
781                        // we need to put the excess in the tail so that the
782                        // user cannot write more than they requested.
783                        tail_length = u16::try_from(available_bytes - requested)
784                            .ok_checked::<TryFromIntError>()
785                            .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
786                            .ok_or(Error::TxLength)?;
787                    }
788                    requested.min(available_bytes)
789                }
790                Err(TryFromIntError { .. }) => available_bytes,
791            };
792
793            requested_bytes -=
794                usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
795                    panic!(
796                        "data_length: {} must be smaller than requested_bytes: {}, which is a usize",
797                        data_length, requested_bytes
798                    )
799                });
800            descriptor.initialize(chain_length, head_length, data_length, tail_length);
801        }
802        assert_eq!(requested_bytes, 0);
803        Ok(())
804    }
805}
806
807impl<K: AllocKind> Drop for AllocGuard<K> {
808    fn drop(&mut self) {
809        if self.is_empty() {
810            return;
811        }
812        K::free(private::Allocation(self));
813    }
814}
815
816impl<K: AllocKind> Deref for AllocGuard<K> {
817    type Target = [DescId<K>];
818
819    fn deref(&self) -> &Self::Target {
820        self.descs.deref()
821    }
822}
823
824/// A contiguous region of the buffer; corresponding to one descriptor.
825///
826/// [`BufferPart`] owns the memory range [ptr, ptr+cap).
827struct BufferPart {
828    /// The data region starts at `ptr`.
829    ptr: *mut u8,
830    /// The capacity for the region is `cap`.
831    cap: usize,
832    /// Used to indicate how many bytes are actually in the buffer, it
833    /// starts as 0 for a tx buffer and as `cap` for a rx buffer. It will
834    /// be used later as `data_length` in the descriptor.
835    len: usize,
836}
837
838impl BufferPart {
839    /// Creates a new [`BufferPart`] that owns the memory region.
840    ///
841    /// # Safety
842    ///
843    /// The caller must make sure the memory pointed by `ptr` lives longer than
844    /// `BufferPart` being constructed. Once a BufferPart is constructed, it is
845    /// assumed that the memory `[ptr..ptr+cap)` is always valid to read and
846    /// write.
847    unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
848        Self { ptr, cap, len }
849    }
850
851    /// Reads bytes from this buffer part.
852    ///
853    /// Reads up to `dst.len()` bytes from the region beginning at `offset`,
854    /// returning how many bytes were read successfully. Partial read is
855    /// not considered as an error.
856    fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
857        let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
858        let to_copy = std::cmp::min(available, dst.len());
859        // Safety: both source memory region is valid for read the destination
860        // memory region is valid for write.
861        unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
862        Ok(to_copy)
863    }
864
865    /// Writes bytes to this buffer part.
866    ///
867    /// Writes up to `src.len()` bytes into the region beginning at `offset`,
868    /// returning how many bytes were written successfully. Partial write is
869    /// not considered as an error.
870    fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
871        let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
872        let to_copy = std::cmp::min(src.len(), available);
873        // Safety: both source memory region is valid for read the destination
874        // memory region is valid for write.
875        unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
876        self.len = std::cmp::max(self.len, offset + to_copy);
877        Ok(to_copy)
878    }
879
880    /// Pads this part of buffer to have length `target`.
881    ///
882    /// `limit` describes the limit for this region to grow beyond capacity.
883    /// `None` means the part is not allowed to grow and padding must be done
884    /// within the existing capacity, `Some(limit)` means this part is allowed
885    /// to extend its capacity up to the limit.
886    fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
887        if target <= self.len {
888            return Ok(target);
889        }
890        if let Some(limit) = limit {
891            if target > limit {
892                return Err(Error::Pad(target, self.cap));
893            }
894            if self.cap < target {
895                self.cap = target
896            }
897        }
898        let new_len = std::cmp::min(target, self.cap);
899        // Safety: This is safe because the destination memory region is valid
900        // for write.
901        unsafe {
902            std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
903        }
904        self.len = new_len;
905        Ok(new_len)
906    }
907
908    /// Returns the buffer part as a slice.
909    fn as_slice_mut(&mut self) -> &mut [u8] {
910        // SAFETY: BufferPart requires the caller to guarantee the ptr used to
911        // create outlives its instance. BufferPart itself is not copy or clone
912        // so we can rely on unsafety of `new` to uphold this.
913        unsafe { std::slice::from_raw_parts_mut(self.ptr, self.len) }
914    }
915}
916
917// `Buffer` needs to be `Send` in order to be useful in async code. Instead
918// of marking `Buffer` as `Send` directly, `BufferPart` is `Send` already
919// and we can let the compiler do the deduction.
920unsafe impl Send for BufferPart {}
921
922impl Debug for BufferPart {
923    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
924        let BufferPart { len, cap, ptr } = &self;
925        f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
926    }
927}
928
929impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
930    fn from(alloc: AllocGuard<K>) -> Self {
931        let AllocGuard { pool, descs: _ } = &alloc;
932        let parts: Chained<BufferPart> = alloc
933            .descriptors()
934            .map(|descriptor| {
935                // The following unwraps are safe because they are already
936                // checked in `DeviceInfo::config`.
937                let offset = usize::try_from(descriptor.offset()).unwrap();
938                let head_length = usize::from(descriptor.head_length());
939                let data_length = usize::try_from(descriptor.data_length()).unwrap();
940                let len = match K::REFL {
941                    AllocKindRefl::Tx => 0,
942                    AllocKindRefl::Rx => data_length,
943                };
944                // Sanity check: make sure the layout is valid.
945                assert!(
946                    offset + head_length <= pool.bytes,
947                    "buffer part starts beyond the end of pool"
948                );
949                assert!(
950                    offset + head_length + data_length <= pool.bytes,
951                    "buffer part ends beyond the end of pool"
952                );
953                // This is safe because the `AllocGuard` makes sure the
954                // underlying memory is valid for the entire time when
955                // `BufferPart` is alive; `add` is safe because
956                // `offset + head_length is within the allocation and
957                // smaller than isize::MAX.
958                unsafe {
959                    BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
960                }
961            })
962            .collect();
963        Self { alloc, parts, pos: 0 }
964    }
965}
966
967impl<K: AllocKind> Read for Buffer<K> {
968    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
969        self.read_at(self.pos, buf)
970            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
971        self.pos += buf.len();
972        Ok(buf.len())
973    }
974}
975
976impl Write for Buffer<Tx> {
977    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
978        self.write_at(self.pos, buf)
979            .map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
980        self.pos += buf.len();
981        Ok(buf.len())
982    }
983
984    fn flush(&mut self) -> std::io::Result<()> {
985        Ok(())
986    }
987}
988
989impl<K: AllocKind> Seek for Buffer<K> {
990    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
991        let pos = match pos {
992            SeekFrom::Start(pos) => pos,
993            SeekFrom::End(offset) => {
994                let end = i64::try_from(self.cap())
995                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
996                u64::try_from(end.wrapping_add(offset)).unwrap()
997            }
998            SeekFrom::Current(offset) => {
999                let current = i64::try_from(self.pos)
1000                    .map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1001                u64::try_from(current.wrapping_add(offset)).unwrap()
1002            }
1003        };
1004        self.pos =
1005            usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
1006        Ok(pos)
1007    }
1008}
1009
1010/// A pending tx allocation request.
1011struct TxAllocReq {
1012    sender: Sender<AllocGuard<Tx>>,
1013    size: ChainLength,
1014}
1015
1016impl TxAllocReq {
1017    fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1018        let (sender, receiver) = channel();
1019        (TxAllocReq { sender, size }, receiver)
1020    }
1021
1022    /// Fulfills the pending request with an `AllocGuard`.
1023    ///
1024    /// If the request is already closed, the guard is simply dropped and
1025    /// returned to the queue.
1026    ///
1027    /// `fulfill` must *not* be called when the `guard`'s pool is holding the tx
1028    /// lock, since we may deadlock/panic upon the double tx lock acquisition.
1029    fn fulfill(self, guard: AllocGuard<Tx>) {
1030        let Self { sender, size: _ } = self;
1031        match sender.send(guard) {
1032            Ok(()) => (),
1033            Err(guard) => {
1034                // It's ok to just drop the guard here, it'll be returned to the
1035                // pool.
1036                drop(guard);
1037            }
1038        }
1039    }
1040}
1041
1042/// A module for sealed traits so that the user of this crate can not implement
1043/// [`AllocKind`] for anything than [`Rx`] and [`Tx`].
1044mod private {
1045    use super::{AllocKind, Rx, Tx};
1046    pub trait Sealed: 'static + Sized {}
1047    impl Sealed for Rx {}
1048    impl Sealed for Tx {}
1049
1050    // We can't leak a private type in a public trait, create an opaque private
1051    // new type for &mut super::AllocGuard so that we can mention it in the
1052    // AllocKind trait.
1053    pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1054}
1055
1056/// An allocation can have two kinds, this trait provides a way to project a
1057/// type ([`Rx`] or [`Tx`]) into a value.
1058pub trait AllocKind: private::Sealed {
1059    /// The reflected value of Self.
1060    const REFL: AllocKindRefl;
1061
1062    /// frees an allocation of the given kind.
1063    fn free(alloc: private::Allocation<'_, Self>);
1064}
1065
1066/// A tag to related types for Tx allocations.
1067pub enum Tx {}
1068/// A tag to related types for Rx allocations.
1069pub enum Rx {}
1070
1071/// The reflected value that allows inspection on an [`AllocKind`] type.
1072pub enum AllocKindRefl {
1073    Tx,
1074    Rx,
1075}
1076
1077impl AllocKindRefl {
1078    pub(in crate::session) fn as_str(&self) -> &'static str {
1079        match self {
1080            AllocKindRefl::Tx => "Tx",
1081            AllocKindRefl::Rx => "Rx",
1082        }
1083    }
1084}
1085
1086impl AllocKind for Tx {
1087    const REFL: AllocKindRefl = AllocKindRefl::Tx;
1088
1089    fn free(alloc: private::Allocation<'_, Self>) {
1090        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1091        pool.free_tx(std::mem::replace(descs, Chained::empty()));
1092    }
1093}
1094
1095impl AllocKind for Rx {
1096    const REFL: AllocKindRefl = AllocKindRefl::Rx;
1097
1098    fn free(alloc: private::Allocation<'_, Self>) {
1099        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1100        pool.free_rx(std::mem::replace(descs, Chained::empty()));
1101        pool.rx_leases.rx_complete();
1102    }
1103}
1104
1105/// An extracted struct containing state pertaining to watching rx leases.
1106pub(in crate::session) struct RxLeaseHandlingState {
1107    can_watch_rx_leases: AtomicBool,
1108    /// Keeps a rolling counter of received rx frames MINUS the target frame
1109    /// number of the current outstanding lease.
1110    ///
1111    /// When no leases are pending (via [`RxLeaseWatcher::wait_until`]),
1112    /// then this matches exactly the number of received frames.
1113    ///
1114    /// Otherwise, the lease is currently waiting for remaining `u64::MAX -
1115    /// rx_Frame_counter` frames. The logic depends on `AtomicU64` wrapping
1116    /// around as part of completing rx buffers.
1117    rx_frame_counter: AtomicU64,
1118    rx_lease_waker: AtomicWaker,
1119}
1120
1121impl RxLeaseHandlingState {
1122    fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1123        Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1124    }
1125
1126    fn new_with_enabled(enabled: bool) -> Self {
1127        Self {
1128            can_watch_rx_leases: AtomicBool::new(enabled),
1129            rx_frame_counter: AtomicU64::new(0),
1130            rx_lease_waker: AtomicWaker::new(),
1131        }
1132    }
1133
1134    /// Increments the total receive frame counter and possibly wakes up a
1135    /// waiting lease yielder.
1136    fn rx_complete(&self) {
1137        let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1138        let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1139
1140        // See wait_until for details. We need to hit a waker whenever our add
1141        // wrapped the u64 back around to 0.
1142        if prev == u64::MAX {
1143            rx_lease_waker.wake();
1144        }
1145    }
1146}
1147
1148/// A trait allowing [`RxLeaseWatcher`] to be agnostic over how to get an
1149/// [`RxLeaseHandlingState`].
1150pub(in crate::session) trait RxLeaseHandlingStateContainer {
1151    fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1152}
1153
1154impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1155    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1156        self.borrow()
1157    }
1158}
1159
1160impl RxLeaseHandlingStateContainer for Arc<Pool> {
1161    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1162        &self.rx_leases
1163    }
1164}
1165
1166/// A type safe-wrapper around a single lease watcher per `Pool`.
1167pub(in crate::session) struct RxLeaseWatcher<T> {
1168    state: T,
1169}
1170
1171impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1172    /// Creates a new lease watcher.
1173    ///
1174    /// # Panics
1175    ///
1176    /// Panics if an [`RxLeaseWatcher`] has already been created for the given
1177    /// pool or the pool was not configured for it.
1178    pub(in crate::session) fn new(state: T) -> Self {
1179        assert!(
1180            state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1181            "can't watch rx leases"
1182        );
1183        Self { state }
1184    }
1185
1186    /// Called by sessions to wait until `hold_until_frame` is fulfilled to
1187    /// yield leases out.
1188    ///
1189    /// Blocks until `hold_until_frame`-th rx buffer has been released.
1190    ///
1191    /// Note that this method takes `&mut self` because only one
1192    /// [`RxLeaseWatcher`] may be created by lease handling state, and exclusive
1193    /// access to it is required to watch lease completion.
1194    pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1195        // A note about wrap-arounds.
1196        //
1197        // We're assuming the frame counter will never wrap around for
1198        // correctness here. This should be fine, even assuming a packet
1199        // rate of 1 million pps it'd take almost 600k years for this counter
1200        // to wrap around:
1201        // - 2^64 / 1e6 / 60 / 60 / 24 / 365 ~ 584e3.
1202
1203        let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1204            self.state.lease_handling_state();
1205
1206        let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1207        // After having subtracted the waiting value we *must always restore the
1208        // value* on return, even if the future is not polled to completion.
1209        let _guard = scopeguard::guard((), |()| {
1210            let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1211        });
1212
1213        // Lease is ready to be fulfilled.
1214        if prev >= hold_until_frame {
1215            return;
1216        }
1217        // Threshold is a wrapped around subtraction. So now we must wait
1218        // until the read value from the atomic is LESS THAN the threshold.
1219        let threshold = prev.wrapping_sub(hold_until_frame);
1220        futures::future::poll_fn(|cx| {
1221            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1222            if v < threshold {
1223                return Poll::Ready(());
1224            }
1225            rx_lease_waker.register(cx.waker());
1226            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1227            if v < threshold {
1228                return Poll::Ready(());
1229            }
1230            Poll::Pending
1231        })
1232        .await;
1233    }
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::*;
1239
1240    use assert_matches::assert_matches;
1241    use fuchsia_async as fasync;
1242    use futures::future::FutureExt;
1243    use test_case::test_case;
1244
1245    use std::collections::HashSet;
1246    use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1247    use std::pin::pin;
1248    use std::task::{Poll, Waker};
1249
1250    const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1251    const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1252    // Safety: These are safe because none of the values are zero.
1253    const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1254    const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1255    const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1256    const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1257        * netdev::MAX_DESCRIPTOR_CHAIN as usize
1258        - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1259        - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1260
1261    const SENTINEL_BYTE: u8 = 0xab;
1262    const WRITE_BYTE: u8 = 1;
1263    const PAD_BYTE: u8 = 0;
1264
1265    const DEFAULT_CONFIG: Config = Config {
1266        buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1267        num_rx_buffers: DEFAULT_RX_BUFFERS,
1268        num_tx_buffers: DEFAULT_TX_BUFFERS,
1269        options: netdev::SessionFlags::empty(),
1270        buffer_layout: BufferLayout {
1271            length: DEFAULT_BUFFER_LENGTH.get(),
1272            min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1273            min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1274            min_tx_data: 0,
1275        },
1276    };
1277
1278    impl Pool {
1279        fn new_test_default() -> Arc<Self> {
1280            let (pool, _descriptors, _data) =
1281                Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1282            pool
1283        }
1284
1285        async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1286            self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1287                .await
1288        }
1289
1290        fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1291            self.alloc_tx_checked(n).now_or_never()
1292        }
1293
1294        fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1295            std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1296        }
1297
1298        fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1299            self.alloc_tx_buffer(num_bytes)
1300                .now_or_never()
1301                .transpose()
1302                .expect("invalid arguments for alloc_tx_buffer")
1303        }
1304
1305        fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1306            Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1307        }
1308
1309        fn fill_sentinel_bytes(&mut self) {
1310            // Safety: We have mut reference to Pool, so we get to modify the
1311            // VMO pointed by self.base.
1312            unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1313        }
1314    }
1315
1316    impl Buffer<Tx> {
1317        // Write a byte at offset, the result buffer should be pad_size long, with
1318        // 0..offset being the SENTINEL_BYTE, offset being the WRITE_BYTE and the
1319        // rest being PAD_BYTE.
1320        fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1321            self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
1322            self.pad().expect("failed to pad");
1323            assert_eq!(self.len(), pad_size);
1324            // An arbitrary value that is not SENTINAL/WRITE/PAD_BYTE so that
1325            // we can make sure the write really happened.
1326            const INIT_BYTE: u8 = 42;
1327            let mut read_buf = vec![INIT_BYTE; pad_size];
1328            self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
1329            for (idx, byte) in read_buf.iter().enumerate() {
1330                if idx < offset {
1331                    assert_eq!(*byte, SENTINEL_BYTE);
1332                } else if idx == offset {
1333                    assert_eq!(*byte, WRITE_BYTE);
1334                } else {
1335                    assert_eq!(*byte, PAD_BYTE);
1336                }
1337            }
1338        }
1339    }
1340
1341    impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1342    where
1343        K: AllocKind,
1344        I: ExactSizeIterator<Item = u16>,
1345        T: Copy + IntoIterator<IntoIter = I>,
1346    {
1347        fn eq(&self, other: &T) -> bool {
1348            let iter = other.into_iter();
1349            if usize::from(self.len) != iter.len() {
1350                return false;
1351            }
1352            self.iter().zip(iter).all(|(l, r)| l.get() == r)
1353        }
1354    }
1355
1356    impl Debug for TxAllocReq {
1357        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1358            let TxAllocReq { sender: _, size } = self;
1359            f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1360        }
1361    }
1362
1363    #[test]
1364    fn alloc_tx_distinct() {
1365        let pool = Pool::new_test_default();
1366        let allocated = pool.alloc_tx_all(1);
1367        assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1368        let distinct = allocated
1369            .iter()
1370            .map(|alloc| {
1371                assert_eq!(alloc.descs.len(), 1);
1372                alloc.descs[0].get()
1373            })
1374            .collect::<HashSet<u16>>();
1375        assert_eq!(allocated.len(), distinct.len());
1376    }
1377
1378    #[test]
1379    fn alloc_tx_free_len() {
1380        let pool = Pool::new_test_default();
1381        {
1382            let allocated = pool.alloc_tx_all(2);
1383            assert_eq!(
1384                allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1385                DEFAULT_TX_BUFFERS.get().into()
1386            );
1387            assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1388        }
1389        assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1390    }
1391
1392    #[test]
1393    fn alloc_tx_chain() {
1394        let pool = Pool::new_test_default();
1395        let allocated = pool.alloc_tx_all(3);
1396        assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1397        assert_matches!(pool.alloc_tx_now_or_never(3), None);
1398        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1399    }
1400
1401    #[test]
1402    fn alloc_tx_many() {
1403        let pool = Pool::new_test_default();
1404        let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1405            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1406            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1407        let data_len = usize::try_from(data_len).unwrap();
1408        let mut buffers = pool
1409            .alloc_tx_buffers(data_len)
1410            .now_or_never()
1411            .expect("failed to alloc")
1412            .unwrap()
1413            // Collect into a vec so we keep the buffers alive, otherwise they
1414            // are immediately returned to the pool.
1415            .collect::<Result<Vec<_>>>()
1416            .expect("buffer error");
1417        assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1418
1419        // We have all the buffers, which means allocating more should not
1420        // resolve.
1421        assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1422
1423        // If we release a single buffer we should be able to retrieve it again.
1424        assert_matches!(buffers.pop(), Some(_));
1425        let mut more_buffers =
1426            pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1427        let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1428        assert_matches!(more_buffers.next(), None);
1429        // The iterator is fused, so None is yielded even after dropping the
1430        // buffer.
1431        drop(buffer);
1432        assert_matches!(more_buffers.next(), None);
1433    }
1434
1435    #[test]
1436    fn alloc_tx_after_free() {
1437        let pool = Pool::new_test_default();
1438        let mut allocated = pool.alloc_tx_all(1);
1439        assert_matches!(pool.alloc_tx_now_or_never(2), None);
1440        {
1441            let _drained = allocated.drain(..2);
1442        }
1443        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1444    }
1445
1446    #[test]
1447    fn blocking_alloc_tx() {
1448        let mut executor = fasync::TestExecutor::new();
1449        let pool = Pool::new_test_default();
1450        let mut allocated = pool.alloc_tx_all(1);
1451        let alloc_fut = pool.alloc_tx_checked(1);
1452        let mut alloc_fut = pin!(alloc_fut);
1453        // The allocation should block.
1454        assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1455        // And the allocation request should be queued.
1456        assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1457        let freed = allocated
1458            .pop()
1459            .expect("no fulfulled allocations")
1460            .iter()
1461            .map(|x| x.get())
1462            .collect::<Chained<_>>();
1463        let same_as_freed =
1464            |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1465        // Now the task should be able to continue.
1466        assert_matches!(
1467            &executor.run_until_stalled(&mut alloc_fut),
1468            Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1469        );
1470        // And the queued request should now be removed.
1471        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1472    }
1473
1474    #[test]
1475    fn blocking_alloc_tx_cancel_before_free() {
1476        let mut executor = fasync::TestExecutor::new();
1477        let pool = Pool::new_test_default();
1478        let mut allocated = pool.alloc_tx_all(1);
1479        {
1480            let alloc_fut = pool.alloc_tx_checked(1);
1481            let mut alloc_fut = pin!(alloc_fut);
1482            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1483            assert_matches!(
1484                pool.tx_alloc_state.lock().requests.as_slices(),
1485                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1486            );
1487        }
1488        assert_matches!(
1489            allocated.pop(),
1490            Some(AllocGuard { ref descs, pool: ref p })
1491                if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1492        );
1493        let state = pool.tx_alloc_state.lock();
1494        assert_eq!(state.free_list.len, 1);
1495        assert!(state.requests.is_empty());
1496    }
1497
1498    #[test]
1499    fn blocking_alloc_tx_cancel_after_free() {
1500        let mut executor = fasync::TestExecutor::new();
1501        let pool = Pool::new_test_default();
1502        let mut allocated = pool.alloc_tx_all(1);
1503        {
1504            let alloc_fut = pool.alloc_tx_checked(1);
1505            let mut alloc_fut = pin!(alloc_fut);
1506            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1507            assert_matches!(
1508                pool.tx_alloc_state.lock().requests.as_slices(),
1509                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1510            );
1511            assert_matches!(
1512                allocated.pop(),
1513                Some(AllocGuard { ref descs, pool: ref p })
1514                    if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1515            );
1516        }
1517        let state = pool.tx_alloc_state.lock();
1518        assert_eq!(state.free_list.len, 1);
1519        assert!(state.requests.is_empty());
1520    }
1521
1522    #[test]
1523    fn multiple_blocking_alloc_tx_fulfill_order() {
1524        const TASKS_TOTAL: usize = 3;
1525        let mut executor = fasync::TestExecutor::new();
1526        let pool = Pool::new_test_default();
1527        let mut allocated = pool.alloc_tx_all(1);
1528        let mut alloc_futs = (1..=TASKS_TOTAL)
1529            .rev()
1530            .map(|x| {
1531                let pool = pool.clone();
1532                (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1533            })
1534            .collect::<Vec<_>>();
1535
1536        for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1537            assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1538            // assert that the tasks are sorted decreasing on the requested size.
1539            assert_eq!(idx + *req_size, TASKS_TOTAL);
1540        }
1541        {
1542            let state = pool.tx_alloc_state.lock();
1543            // The first pending request was introduced by `alloc_tx_all`.
1544            assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1545            let mut requests = state.requests.iter();
1546            // It should already be cancelled because the requesting future is
1547            // already dropped.
1548            assert!(requests.next().unwrap().sender.is_canceled());
1549            // The rest of the requests must not be cancelled.
1550            assert!(requests.all(|req| !req.sender.is_canceled()))
1551        }
1552
1553        let mut to_free = Vec::new();
1554        let mut freed = 0;
1555        for free_size in (1..=TASKS_TOTAL).rev() {
1556            let (_req_size, mut task) = alloc_futs.remove(0);
1557            for _ in 1..free_size {
1558                freed += 1;
1559                assert_matches!(
1560                    allocated.pop(),
1561                    Some(AllocGuard { ref descs, pool: ref p })
1562                        if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1563                );
1564                assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1565            }
1566            freed += 1;
1567            assert_matches!(
1568                allocated.pop(),
1569                Some(AllocGuard { ref descs, pool: ref p })
1570                    if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1571            );
1572            match executor.run_until_stalled(&mut task) {
1573                Poll::Ready(alloc) => {
1574                    assert_eq!(alloc.len(), free_size);
1575                    // Don't return the allocation to the pool now.
1576                    to_free.push(alloc);
1577                }
1578                Poll::Pending => panic!("The request should be fulfilled"),
1579            }
1580            // The rest of requests can not be fulfilled.
1581            for (_req_size, task) in alloc_futs.iter_mut() {
1582                assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1583            }
1584        }
1585        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1586    }
1587
1588    #[test]
1589    fn singleton_tx_layout() {
1590        let pool = Pool::new_test_default();
1591        let buffers = std::iter::from_fn(|| {
1592            let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1593                - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1594                - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1595            pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1596                assert_eq!(buffer.alloc.descriptors().count(), 1);
1597                let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1598                    * u64::from(buffer.alloc[0].get());
1599                {
1600                    let descriptor = buffer.alloc.descriptor();
1601                    assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1602                    assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1603                    assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1604                    assert_eq!(descriptor.data_length(), data_len);
1605                    assert_eq!(descriptor.offset(), offset);
1606                }
1607
1608                assert_eq!(buffer.parts.len(), 1);
1609                let BufferPart { ptr, len, cap } = buffer.parts[0];
1610                assert_eq!(len, 0);
1611                assert_eq!(
1612                    // Using wrapping_add because we will never dereference the
1613                    // resulting pointer and it saves us an unsafe block.
1614                    pool.base.as_ptr().wrapping_add(
1615                        usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1616                    ),
1617                    ptr
1618                );
1619                assert_eq!(data_len, u32::try_from(cap).unwrap());
1620                buffer
1621            })
1622        })
1623        .collect::<Vec<_>>();
1624        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1625    }
1626
1627    #[test]
1628    fn chained_tx_layout() {
1629        let pool = Pool::new_test_default();
1630        let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1631            - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1632            - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1633        let buffers = std::iter::from_fn(|| {
1634            pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1635                assert_eq!(buffer.parts.len(), 4);
1636                for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
1637                    let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1638                    let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1639                    let tail_length = if chain_length == ChainLength::ZERO {
1640                        DEFAULT_MIN_TX_BUFFER_TAIL
1641                    } else {
1642                        0
1643                    };
1644                    let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1645                        - u32::from(head_length)
1646                        - u32::from(tail_length);
1647                    let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1648                        * u64::from(buffer.alloc[idx].get());
1649                    assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1650                    assert_eq!(descriptor.head_length(), head_length);
1651                    assert_eq!(descriptor.tail_length(), tail_length);
1652                    assert_eq!(descriptor.offset(), offset);
1653                    assert_eq!(descriptor.data_length(), data_len);
1654                    if chain_length != ChainLength::ZERO {
1655                        assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1656                    }
1657
1658                    let BufferPart { ptr, cap, len } = buffer.parts[idx];
1659                    assert_eq!(len, 0);
1660                    assert_eq!(
1661                        // Using wrapping_add because we will never dereference
1662                        // the resulting ptr and it saves us an unsafe block.
1663                        pool.base.as_ptr().wrapping_add(
1664                            usize::try_from(offset).unwrap() + usize::from(head_length),
1665                        ),
1666                        ptr
1667                    );
1668                    assert_eq!(data_len, u32::try_from(cap).unwrap());
1669                }
1670                buffer
1671            })
1672        })
1673        .collect::<Vec<_>>();
1674        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1675    }
1676
1677    #[test]
1678    fn rx_distinct() {
1679        let pool = Pool::new_test_default();
1680        let mut guard = pool.rx_pending.inner.lock();
1681        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1682        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1683        let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1684        assert_eq!(descs.len(), distinct.len());
1685    }
1686
1687    #[test]
1688    fn alloc_rx_layout() {
1689        let pool = Pool::new_test_default();
1690        let mut guard = pool.rx_pending.inner.lock();
1691        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1692        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1693        for desc in descs.iter() {
1694            let descriptor = pool.descriptors.borrow(desc);
1695            let offset =
1696                u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1697            assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1698            assert_eq!(descriptor.head_length(), 0);
1699            assert_eq!(descriptor.tail_length(), 0);
1700            assert_eq!(descriptor.offset(), offset);
1701            assert_eq!(
1702                descriptor.data_length(),
1703                u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1704            );
1705        }
1706    }
1707
1708    #[test]
1709    fn buffer_read_at_write_at() {
1710        let pool = Pool::new_test_default();
1711        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1712        let mut buffer =
1713            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1714        // Because we have to accommodate the space for head and tail, there
1715        // would be 2 parts instead of 1.
1716        assert_eq!(buffer.parts.len(), 2);
1717        assert_eq!(buffer.cap(), alloc_bytes);
1718        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1719        buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
1720        let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1721        buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
1722        for (idx, byte) in read_buf.iter().enumerate() {
1723            assert_eq!(*byte, write_buf[idx]);
1724        }
1725    }
1726
1727    #[test]
1728    fn buffer_read_write_seek() {
1729        let pool = Pool::new_test_default();
1730        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1731        let mut buffer =
1732            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1733        // Because we have to accommodate the space for head and tail, there
1734        // would be 2 parts instead of 1.
1735        assert_eq!(buffer.parts.len(), 2);
1736        assert_eq!(buffer.cap(), alloc_bytes);
1737        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1738        assert_eq!(
1739            buffer.write(&write_buf[..]).expect("failed to write into buffer"),
1740            write_buf.len()
1741        );
1742        const SEEK_FROM_END: usize = 64;
1743        const READ_LEN: usize = 12;
1744        assert_eq!(
1745            buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1746            u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
1747        );
1748        let mut read_buf = [0xff; READ_LEN];
1749        assert_eq!(
1750            buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
1751            read_buf.len()
1752        );
1753        assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1754    }
1755
1756    #[test_case(32; "single buffer part")]
1757    #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1758    fn buffer_pad(pad_size: usize) {
1759        let mut pool = Pool::new_test_default();
1760        pool.set_min_tx_buffer_length(pad_size);
1761        for offset in 0..pad_size {
1762            Arc::get_mut(&mut pool)
1763                .expect("there are multiple owners of the underlying VMO")
1764                .fill_sentinel_bytes();
1765            let mut buffer =
1766                pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
1767            buffer.check_write_and_pad(offset, pad_size);
1768        }
1769    }
1770
1771    #[test]
1772    fn buffer_pad_grow() {
1773        const BUFFER_PARTS: u8 = 3;
1774        let mut pool = Pool::new_test_default();
1775        let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1776            * u32::from(BUFFER_PARTS)
1777            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1778            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1779        pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1780        for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1781            Arc::get_mut(&mut pool)
1782                .expect("there are multiple owners of the underlying VMO")
1783                .fill_sentinel_bytes();
1784            let mut alloc =
1785                pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1786            alloc
1787                .init(
1788                    DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
1789                        - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1790                        - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
1791                )
1792                .expect("head/body/tail sizes are representable with u16/u32/u16");
1793            let mut buffer = Buffer::try_from(alloc).unwrap();
1794            buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1795        }
1796    }
1797
1798    #[test_case(  0; "writes at the beginning")]
1799    #[test_case( 15; "writes in the first part")]
1800    #[test_case( 75; "writes in the second part")]
1801    #[test_case(135; "writes in the third part")]
1802    #[test_case(195; "writes in the last part")]
1803    fn buffer_used(write_offset: usize) {
1804        let pool = Pool::new_test_default();
1805        let mut buffer =
1806            pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1807        let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1808            if i == 0 {
1809                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1810            } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1811                DEFAULT_BUFFER_LENGTH.get()
1812            } else {
1813                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1814            }
1815        });
1816        assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1817        buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
1818        // The accumulator is Some if we haven't found the part where the byte
1819        // was written, None if we've already found it.
1820        assert_eq!(
1821            buffer.parts.iter().zip(expected_caps).fold(
1822                Some(write_offset),
1823                |offset, (part, expected_cap)| {
1824                    // The cap must match the expectation.
1825                    assert_eq!(part.cap, expected_cap);
1826
1827                    match offset {
1828                        Some(offset) => {
1829                            if offset >= expected_cap {
1830                                // The part should have used all the capacity.
1831                                assert_eq!(part.len, part.cap);
1832                                Some(offset - part.len)
1833                            } else {
1834                                // The part should end right after our byte.
1835                                assert_eq!(part.len, offset + 1);
1836                                let mut buf = [0];
1837                                // Verify that the byte is indeed written.
1838                                assert_matches!(part.read_at(offset, &mut buf), Ok(1));
1839                                assert_eq!(buf[0], WRITE_BYTE);
1840                                None
1841                            }
1842                        }
1843                        None => {
1844                            // We should have never written in this part.
1845                            assert_eq!(part.len, 0);
1846                            None
1847                        }
1848                    }
1849                }
1850            ),
1851            None
1852        )
1853    }
1854
1855    #[test]
1856    fn buffer_commit() {
1857        let pool = Pool::new_test_default();
1858        for offset in 0..MAX_BUFFER_BYTES {
1859            let mut buffer = pool
1860                .alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
1861                .expect("failed to allocate buffer");
1862            buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
1863            buffer.commit();
1864            for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
1865                let head_length = descriptor.head_length();
1866                let tail_length = descriptor.tail_length();
1867                let data_length = descriptor.data_length();
1868                assert_eq!(u32::try_from(part.len).unwrap(), data_length);
1869                assert_eq!(
1870                    u32::from(head_length + tail_length) + data_length,
1871                    u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
1872                );
1873            }
1874        }
1875    }
1876
1877    #[test]
1878    fn allocate_under_device_minimum() {
1879        const MIN_TX_DATA: usize = 32;
1880        const ALLOC_SIZE: usize = 16;
1881        const WRITE_BYTE: u8 = 0xff;
1882        const WRITE_SENTINAL_BYTE: u8 = 0xee;
1883        const READ_SENTINAL_BYTE: u8 = 0xdd;
1884        let mut config = DEFAULT_CONFIG;
1885        config.buffer_layout.min_tx_data = MIN_TX_DATA;
1886        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1887        for mut buffer in Vec::from_iter(std::iter::from_fn({
1888            let pool = pool.clone();
1889            move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1890        })) {
1891            buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
1892        }
1893        let mut allocated =
1894            pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1895        assert_eq!(allocated.cap(), ALLOC_SIZE);
1896        const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
1897        assert_matches!(
1898            allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
1899            Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
1900        );
1901        allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
1902        assert_matches!(allocated.pad(), Ok(()));
1903        assert_eq!(allocated.cap(), MIN_TX_DATA);
1904        assert_eq!(allocated.len(), MIN_TX_DATA);
1905        const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1906        let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1907        assert_matches!(
1908            allocated.read_at(0, &mut read_buf[..]),
1909            Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
1910        );
1911        allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
1912        assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1913        assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
1914        assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1915    }
1916
1917    #[test]
1918    fn invalid_tx_length() {
1919        let mut config = DEFAULT_CONFIG;
1920        config.buffer_layout.length = usize::from(u16::MAX) + 2;
1921        config.buffer_layout.min_tx_head = 0;
1922        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1923        assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1924    }
1925
1926    #[test]
1927    fn rx_leases() {
1928        let mut executor = fuchsia_async::TestExecutor::new();
1929        let state = RxLeaseHandlingState::new_with_enabled(true);
1930        let mut watcher = RxLeaseWatcher { state: &state };
1931
1932        {
1933            let mut fut = pin!(watcher.wait_until(0));
1934            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1935        }
1936        {
1937            state.rx_complete();
1938            let mut fut = pin!(watcher.wait_until(1));
1939            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1940        }
1941        {
1942            let mut fut = pin!(watcher.wait_until(0));
1943            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1944        }
1945        {
1946            let mut fut = pin!(watcher.wait_until(3));
1947            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1948            state.rx_complete();
1949            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1950            state.rx_complete();
1951            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
1952        }
1953        // Dropping the wait future without seeing it complete restores the
1954        // value.
1955        let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1956        {
1957            let mut fut = pin!(watcher.wait_until(10000));
1958            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
1959        }
1960        let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
1961        assert_eq!(counter_before, counter_after);
1962    }
1963}