Skip to main content

netdevice_client/session/buffer/
pool.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice buffer pool.
6
7use fuchsia_sync::Mutex;
8use futures::task::AtomicWaker;
9use std::borrow::Borrow;
10use std::collections::VecDeque;
11use std::convert::TryInto as _;
12use std::fmt::Debug;
13use std::io::{Read, Seek, SeekFrom, Write};
14use std::mem::MaybeUninit;
15use std::num::TryFromIntError;
16use std::ops::{Deref, DerefMut};
17use std::ptr::NonNull;
18use std::sync::Arc;
19use std::sync::atomic::{self, AtomicBool, AtomicU64};
20use std::task::Poll;
21
22use arrayvec::ArrayVec;
23use explicit::ResultExt as _;
24use fidl_fuchsia_hardware_network as netdev;
25use fuchsia_runtime::vmar_root_self;
26use futures::channel::oneshot::{Receiver, Sender, channel};
27
28use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
29use crate::error::{Error, Result};
30use crate::session::{BufferLayout, Config, Pending, Port};
31
32/// Responsible for managing [`Buffer`]s for a [`Session`](crate::session::Session).
33pub(in crate::session) struct Pool {
34    /// Base address of the pool.
35    // Note: This field requires us to manually implement `Sync` and `Send`.
36    base: NonNull<u8>,
37    /// The length of the pool in bytes.
38    bytes: usize,
39    /// The descriptors allocated for the pool.
40    descriptors: Descriptors,
41    /// Shared state for allocation.
42    tx_alloc_state: Mutex<TxAllocState>,
43    /// The free rx descriptors pending to be sent to driver.
44    pub(in crate::session) rx_pending: Pending<Rx>,
45    /// The buffer layout.
46    buffer_layout: BufferLayout,
47    /// State-keeping allowing sessions to handle rx leases.
48    rx_leases: RxLeaseHandlingState,
49}
50
51// `Pool` is `Send` and `Sync`, and this allows the compiler to deduce `Buffer`
52// to be `Send`. These impls are safe because we can safely share `Pool` and
53// `&Pool`: the implementation would never allocate the same buffer to two
54// callers at the same time.
55unsafe impl Send for Pool {}
56unsafe impl Sync for Pool {}
57
58/// The shared state which keeps track of available buffers and tx buffers.
59struct TxAllocState {
60    /// All pending tx allocation requests.
61    requests: VecDeque<TxAllocReq>,
62    free_list: TxFreeList,
63}
64
65/// We use a linked list to maintain the tx free descriptors - they are linked
66/// through their `nxt` fields, note this differs from the chaining expected
67/// by the network device protocol:
68/// - You can chain more than [`netdev::MAX_DESCRIPTOR_CHAIN`] descriptors
69///   together.
70/// - the free-list ends when the `nxt` field is 0xff, while the normal chain
71///   ends when `chain_length` becomes 0.
72struct TxFreeList {
73    /// The head of a linked list of available descriptors that can be allocated
74    /// for tx.
75    head: Option<DescId<Tx>>,
76    /// How many free descriptors are there in the pool.
77    len: u16,
78}
79
80impl Pool {
81    /// Creates a new [`Pool`] and its backing [`zx::Vmo`]s.
82    ///
83    /// Returns [`Pool`] and the [`zx::Vmo`]s for descriptors and data, in that
84    /// order.
85    pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
86        let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
87            config;
88        let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
89        let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
90            Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
91
92        // Construct the free list.
93        let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
94            descriptors.borrow_mut(&mut curr).set_nxt(head);
95            Some(curr)
96        });
97
98        for rx_desc in rx_free.iter_mut() {
99            descriptors.borrow_mut(rx_desc).initialize(
100                ChainLength::ZERO,
101                0,
102                buffer_layout.length.try_into().unwrap(),
103                0,
104            );
105        }
106
107        let tx_alloc_state = TxAllocState {
108            free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
109            requests: VecDeque::new(),
110        };
111
112        let size = buffer_stride.get() * u64::from(num_buffers);
113        let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
114
115        const VMO_NAME: zx::Name =
116            const_unwrap::const_unwrap_result(zx::Name::new("netdevice:data"));
117        data_vmo.set_name(&VMO_NAME).map_err(|status| Error::Vmo("set name", status))?;
118        // `as` is OK because `size` is positive and smaller than isize::MAX.
119        // This is following the practice of rust stdlib to ensure allocation
120        // size never reaches isize::MAX.
121        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
122        let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
123        // The returned address of zx_vmar_map on success must be non-zero:
124        // https://fuchsia.dev/fuchsia-src/reference/syscalls/vmar_map
125        let base = NonNull::new(
126            vmar_root_self()
127                .map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
128                .map_err(|status| Error::Map("data", status))? as *mut u8,
129        )
130        .unwrap();
131
132        Ok((
133            Arc::new(Pool {
134                base,
135                bytes: len,
136                descriptors,
137                tx_alloc_state: Mutex::new(tx_alloc_state),
138                rx_pending: Pending::new(rx_free),
139                buffer_layout,
140                rx_leases: RxLeaseHandlingState::new_with_flags(options),
141            }),
142            descriptors_vmo,
143            data_vmo,
144        ))
145    }
146
147    /// Allocates `num_parts` tx descriptors.
148    ///
149    /// It will block if there are not enough descriptors. Note that the
150    /// descriptors are not initialized, you need to call [`AllocGuard::init()`]
151    /// on the returned [`AllocGuard`] if you want to send it to the driver
152    /// later. See [`AllocGuard<Rx>::into_tx()`] for an example where
153    /// [`AllocGuard::init()`] is not needed because the tx allocation will be
154    /// returned to the pool immediately and won't be sent to the driver.
155    pub(in crate::session) async fn alloc_tx(
156        self: &Arc<Self>,
157        num_parts: ChainLength,
158    ) -> AllocGuard<Tx> {
159        let receiver = {
160            let mut state = self.tx_alloc_state.lock();
161            match state.free_list.try_alloc(num_parts, &self.descriptors) {
162                Some(allocated) => {
163                    return AllocGuard::new(allocated, self.clone());
164                }
165                None => {
166                    let (request, receiver) = TxAllocReq::new(num_parts);
167                    state.requests.push_back(request);
168                    receiver
169                }
170            }
171        };
172        // The sender must not be dropped.
173        receiver.await.unwrap()
174    }
175
176    /// Tries to allocate a [`SinglePartTxBuffer`].
177    ///
178    /// Returns `Ok(None)` if there is no available buffer, or `Err(Error::TxLength)`
179    /// if the requested size cannot meet the device requirement.
180    pub(in crate::session) fn try_alloc_single_part_tx_buffer(
181        self: &Arc<Self>,
182        num_bytes: usize,
183    ) -> Result<Option<SinglePartTxBuffer>> {
184        let BufferLayout { min_tx_data: _, min_tx_head, min_tx_tail, length: buffer_length } =
185            self.buffer_layout;
186        if num_bytes > buffer_length - usize::from(min_tx_head) - usize::from(min_tx_tail) {
187            return Err(Error::TxLength);
188        }
189        self.tx_alloc_state
190            .lock()
191            .free_list
192            .try_alloc(ChainLength::try_from(1u8).unwrap(), &self.descriptors)
193            .map(|allocated| -> Result<SinglePartTxBuffer> {
194                let mut alloc = AllocGuard::new(allocated, self.clone());
195                alloc.init(num_bytes)?;
196                let buffer = Buffer::from(alloc);
197                Ok(SinglePartTxBuffer::new(buffer, num_bytes).expect("must be single part"))
198            })
199            .transpose()
200    }
201
202    /// Allocates a tx [`Buffer`].
203    ///
204    /// The returned buffer will have `num_bytes` as its capacity, the method
205    /// will block if there are not enough buffers. An error will be returned if
206    /// the requested size cannot meet the device requirement, for example, if
207    /// the size of the head or tail region will become unrepresentable in u16.
208    pub(in crate::session) async fn alloc_tx_buffer(
209        self: &Arc<Self>,
210        num_bytes: usize,
211    ) -> Result<Buffer<Tx>> {
212        self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
213    }
214
215    /// Waits for at least one TX buffer to be available and returns an iterator
216    /// of buffers with `num_bytes` as capacity.
217    ///
218    /// The returned iterator is guaranteed to yield at least one item (though
219    /// it might be an error if the requested size cannot meet the device
220    /// requirement).
221    ///
222    /// # Note
223    ///
224    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
225    /// returned iterator will seemingly yield infinite items if the yielded
226    /// `Buffer`s are dropped while iterating.
227    pub(in crate::session) async fn alloc_tx_buffers<'a>(
228        self: &'a Arc<Self>,
229        num_bytes: usize,
230    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
231        let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
232            self.buffer_layout;
233        let tx_head = usize::from(min_tx_head);
234        let tx_tail = usize::from(min_tx_tail);
235        let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
236        let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
237        let chain_length = ChainLength::try_from(num_parts)?;
238        let first = self.alloc_tx(chain_length).await;
239        let iter = std::iter::once(first)
240            .chain(std::iter::from_fn(move || {
241                let mut state = self.tx_alloc_state.lock();
242                state
243                    .free_list
244                    .try_alloc(chain_length, &self.descriptors)
245                    .map(|allocated| AllocGuard::new(allocated, self.clone()))
246            }))
247            // Fuse afterwards so we're guaranteeing we can't see a new entry
248            // after having yielded `None` once.
249            .fuse()
250            .map(move |mut alloc| {
251                alloc.init(num_bytes)?;
252                Ok(alloc.into())
253            });
254        Ok(iter)
255    }
256
257    /// Frees rx descriptors.
258    pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
259        self.rx_pending.extend(descs.into_iter().map(|mut desc| {
260            self.descriptors.borrow_mut(&mut desc).initialize(
261                ChainLength::ZERO,
262                0,
263                self.buffer_layout.length.try_into().unwrap(),
264                0,
265            );
266            desc
267        }));
268    }
269
270    /// Frees tx descriptors.
271    ///
272    /// # Panics
273    ///
274    /// Panics if given an empty chain.
275    fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
276        // We store any pending request that need to be fulfilled in the stack
277        // here, to fulfill them only once we drop the lock, guaranteeing an
278        // AllocGuard can't be dropped while the lock is held.
279        let mut to_fulfill = ArrayVec::<
280            (TxAllocReq, AllocGuard<Tx>),
281            { netdev::MAX_DESCRIPTOR_CHAIN as usize },
282        >::new();
283
284        let mut state = self.tx_alloc_state.lock();
285
286        {
287            let mut descs = chain.into_iter();
288            // The following can't overflow because we can have at most u16::MAX
289            // descriptors: free_len + #(to_free) + #(descs in use) <= u16::MAX,
290            // Thus free_len + #(to_free) <= u16::MAX.
291            state.free_list.len += u16::try_from(descs.len()).unwrap();
292            let head = descs.next();
293            let old_head = std::mem::replace(&mut state.free_list.head, head);
294            let mut tail = descs.last();
295            let mut tail_ref = self.descriptors.borrow_mut(
296                tail.as_mut().unwrap_or_else(|| state.free_list.head.as_mut().unwrap()),
297            );
298            tail_ref.set_nxt(old_head);
299        }
300
301        // After putting the chain back into the free list, we try to fulfill
302        // any pending tx allocation requests.
303        while let Some(req) = state.requests.front() {
304            // Skip a request that we know is canceled.
305            //
306            // This is an optimization for long-ago dropped requests, since the
307            // receiver side can be dropped between here and fulfillment later.
308            if req.sender.is_canceled() {
309                let _cancelled: Option<TxAllocReq> = state.requests.pop_front();
310                continue;
311            }
312            let size = req.size;
313            match state.free_list.try_alloc(size, &self.descriptors) {
314                Some(descs) => {
315                    // The unwrap is safe because we know requests is not empty.
316                    let req = state.requests.pop_front().unwrap();
317                    to_fulfill.push((req, AllocGuard::new(descs, self.clone())));
318
319                    // If we're full temporarily release the lock to go again
320                    // later. Fulfilling a request must _always_ be done without
321                    // holding the lock.
322                    if to_fulfill.is_full() {
323                        drop(state);
324                        for (req, alloc) in to_fulfill.drain(..) {
325                            req.fulfill(alloc)
326                        }
327                        state = self.tx_alloc_state.lock();
328                    }
329                }
330                None => break,
331            }
332        }
333
334        // Make sure we're not holding the state lock when fulfilling requests.
335        drop(state);
336        // Fulfill any ready requests.
337        for (req, alloc) in to_fulfill {
338            req.fulfill(alloc)
339        }
340    }
341
342    /// Frees the completed tx descriptors chained by head to the pool.
343    ///
344    /// Call this function when the driver hands back a completed tx descriptor.
345    pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
346        let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
347        Ok(self.free_tx(chain))
348    }
349
350    /// Creates a [`Buffer<Rx>`] corresponding to the completed rx descriptors.
351    ///
352    /// Whenever the driver hands back a completed rx descriptor, this function
353    /// can be used to create the buffer that is represented by those chained
354    /// descriptors.
355    pub(in crate::session) fn rx_completed(
356        self: &Arc<Self>,
357        head: DescId<Rx>,
358    ) -> Result<Buffer<Rx>> {
359        let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
360        let alloc = AllocGuard::new(descs, self.clone());
361        Ok(alloc.into())
362    }
363
364    fn get_slice<'a, K: AllocKind>(&self, desc: &'a DescId<K>) -> &'a [u8] {
365        let desc = self.descriptors.borrow(desc);
366        let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
367            .expect("usize must hold u64");
368        let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
369        // Safety: The descriptor is describing a buffer from this pool. It must
370        // be valid to create a slice into that region. We hold a immutable
371        // reference to the underlying descriptor, this means no one else should
372        // have mutable reference to this memory region.
373        unsafe {
374            let ptr = self.base.as_ptr().add(offset);
375            std::slice::from_raw_parts(ptr, len)
376        }
377    }
378
379    fn get_slice_mut<'a, K: AllocKind>(&self, desc: &'a mut DescId<K>) -> &'a mut [u8] {
380        let desc = self.descriptors.borrow_mut(desc);
381        let offset = usize::try_from(desc.offset() + u64::from(desc.head_length()))
382            .expect("usize must hold u64");
383        let len = usize::try_from(desc.data_length()).expect("usize must hold u32");
384        // Safety: The descriptor is describing a buffer from this pool. It must
385        // be valid to create a slice into that region. We hold a mutable
386        // reference to the underlying descriptor, this means we are currently
387        // the only one has access to this memory region.
388        unsafe {
389            let ptr = self.base.as_ptr().add(offset);
390            std::slice::from_raw_parts_mut(ptr, len)
391        }
392    }
393}
394
395impl Drop for Pool {
396    fn drop(&mut self) {
397        unsafe {
398            vmar_root_self()
399                .unmap(self.base.as_ptr() as usize, self.bytes)
400                .expect("failed to unmap VMO for Pool")
401        }
402    }
403}
404
405impl TxFreeList {
406    /// Tries to allocate tx descriptors.
407    ///
408    /// Returns [`None`] if there are not enough descriptors.
409    fn try_alloc(
410        &mut self,
411        num_parts: ChainLength,
412        descriptors: &Descriptors,
413    ) -> Option<Chained<DescId<Tx>>> {
414        if u16::from(num_parts.get()) > self.len {
415            return None;
416        }
417
418        let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
419            let new_head = self.head.as_ref().and_then(|head| {
420                let nxt = descriptors.borrow(head).nxt();
421                nxt.map(|id| unsafe {
422                    // Safety: This is the nxt field of head of the free list,
423                    // it must be a tx descriptor id.
424                    DescId::from_raw(id)
425                })
426            });
427            std::mem::replace(&mut self.head, new_head)
428        });
429        let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
430        assert_eq!(allocated.len(), num_parts.into());
431        self.len -= u16::from(num_parts.get());
432        Some(allocated)
433    }
434}
435
436/// The buffer that can be used by the [`Session`](crate::session::Session).
437pub struct Buffer<K: AllocKind> {
438    /// The descriptors allocation.
439    alloc: AllocGuard<K>,
440}
441
442impl<K: AllocKind> Buffer<K> {
443    /// Returns the length of data region of the buffer.
444    pub fn len(&self) -> usize {
445        self.parts().map(|s| s.len()).sum()
446    }
447
448    /// Returns an iterator over the data slices of the buffer parts.
449    fn parts(&self) -> impl Iterator<Item = &[u8]> + '_ {
450        self.alloc.descs.iter().map(|desc| self.alloc.pool.get_slice(desc))
451    }
452
453    /// Returns an iterator over the mutable valid data slices of the buffer parts.
454    fn parts_mut(&mut self) -> impl Iterator<Item = &mut [u8]> + '_ {
455        self.alloc.descs.iter_mut().map(|desc| self.alloc.pool.get_slice_mut(desc))
456    }
457
458    /// Leaks the underlying buffer descriptors to the driver.
459    pub(in crate::session) fn leak(mut self) -> DescId<K> {
460        let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
461        descs.into_iter().next().unwrap()
462    }
463
464    /// Retrieves the frame type of the buffer.
465    pub fn frame_type(&self) -> Result<netdev::FrameType> {
466        self.alloc.descriptor().frame_type()
467    }
468
469    /// Retrieves the buffer's source port.
470    pub fn port(&self) -> Port {
471        self.alloc.descriptor().port()
472    }
473
474    /// Returns the buffer data as a slice.
475    pub fn as_slice(&self) -> Option<&[u8]> {
476        if self.alloc.len() != 1 {
477            return None;
478        }
479        self.parts().next()
480    }
481
482    /// Returns the buffer data as a mutable slice.
483    pub fn as_slice_mut(&mut self) -> Option<&mut [u8]> {
484        if self.alloc.len() != 1 {
485            return None;
486        }
487        self.parts_mut().next()
488    }
489
490    /// Returns a wrapper for read-only operations.
491    pub fn io(&self) -> BufferIORef<'_, K> {
492        let mut len = 0;
493        let parts: Chained<&[u8]> = self.parts().inspect(|s| len += s.len()).collect();
494        BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
495    }
496
497    /// Returns a wrapper for read-write operations.
498    pub fn io_mut(&mut self) -> BufferIOMut<'_, K> {
499        let mut len = 0;
500        let parts: Chained<&mut [u8]> = self.parts_mut().inspect(|s| len += s.len()).collect();
501        BufferIO { parts, pos: 0, len, _marker: std::marker::PhantomData }
502    }
503}
504
505impl Buffer<Tx> {
506    /// Sets the buffer's destination port.
507    pub fn set_port(&mut self, port: Port) {
508        self.alloc.descriptor_mut().set_port(port)
509    }
510
511    /// Sets the frame type of the buffer.
512    pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
513        self.alloc.descriptor_mut().set_frame_type(frame_type)
514    }
515
516    /// Sets TxFlags of a Tx buffer.
517    pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
518        self.alloc.descriptor_mut().set_tx_flags(flags)
519    }
520}
521
522impl Buffer<Rx> {
523    /// Turns an rx buffer into a tx one.
524    pub async fn into_tx(self) -> Buffer<Tx> {
525        let Buffer { alloc } = self;
526        Buffer { alloc: alloc.into_tx().await }
527    }
528
529    /// Retrieves RxFlags of an Rx Buffer.
530    pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
531        self.alloc.descriptor().rx_flags()
532    }
533}
534
535impl<K: AllocKind> Debug for Buffer<K> {
536    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537        let Self { alloc } = self;
538        f.debug_struct("Buffer").field("alloc", alloc).finish()
539    }
540}
541
542/// A witness type that proves the buffer is backed by one part only
543/// and thus can be converted into `&[u8]`.
544pub struct SinglePartTxBuffer(Buffer<Tx>);
545
546impl SinglePartTxBuffer {
547    /// Creates a new [`SinglePartTxBuffer`] from a [`Buffer<Tx>`] if it is
548    /// backed by one part only.
549    pub fn new(buffer: Buffer<Tx>, len: usize) -> Option<Self> {
550        if buffer.alloc.len() != 1 {
551            None
552        } else {
553            let cap = usize::try_from(buffer.alloc.descriptor().data_length())
554                .expect("u32 must fit in a usize");
555            if cap < len {
556                return None;
557            }
558            Some(Self(buffer))
559        }
560    }
561
562    /// Converts back to a Tx buffer.
563    pub fn into_inner(self) -> Buffer<Tx> {
564        let Self(buffer) = self;
565        buffer
566    }
567}
568
569impl AsRef<[u8]> for SinglePartTxBuffer {
570    fn as_ref(&self) -> &[u8] {
571        // Safety: `SinglePartTxBuffer` is guaranteed to have exactly one part
572        // (verified on creation), so the first descriptor is always initialized.
573        let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_ref() };
574        self.0.alloc.pool.get_slice(desc)
575    }
576}
577
578impl AsMut<[u8]> for SinglePartTxBuffer {
579    fn as_mut(&mut self) -> &mut [u8] {
580        // Safety: `SinglePartTxBuffer` is guaranteed to have exactly one part
581        // (verified on creation), so the first descriptor is always initialized.
582        let desc = unsafe { self.0.alloc.descs.storage[0].assume_init_mut() };
583        self.0.alloc.pool.get_slice_mut(desc)
584    }
585}
586
587impl packet::FragmentedBuffer for SinglePartTxBuffer {
588    fn len(&self) -> usize {
589        let desc = self.0.alloc.descriptor();
590        usize::try_from(desc.data_length()).expect("u32 must fit in a usize")
591    }
592
593    fn with_bytes<'a, R, F>(&'a self, f: F) -> R
594    where
595        F: for<'b> FnOnce(packet::FragmentedBytes<'b, 'a>) -> R,
596    {
597        f(packet::FragmentedBytes::new(&mut [self.as_ref()][..]))
598    }
599}
600
601/// A wrapper around [`Buffer`] for sequential I/O.
602///
603/// `T` must be a slice reference type, typically `&'a [u8]` for read-only
604/// operations, or `&'a mut [u8]` for read-write operations.
605pub struct BufferIO<T, K: AllocKind> {
606    parts: Chained<T>,
607    pos: usize,
608    len: usize,
609    _marker: std::marker::PhantomData<K>,
610}
611
612pub type BufferIORef<'a, K> = BufferIO<&'a [u8], K>;
613pub type BufferIOMut<'a, K> = BufferIO<&'a mut [u8], K>;
614
615impl<T> BufferIO<T, Tx>
616where
617    T: AsMut<[u8]>,
618{
619    /// Writes data from `src` into the TX buffer starting at the specified `offset`.
620    ///
621    /// This method is infallible. It returns the number of bytes successfully written.
622    ///
623    /// If the specified `offset` is greater than or equal to the total length of the
624    /// buffer, or if the buffer has no remaining capacity at the offset, `0` bytes
625    /// will be written.
626    ///
627    /// If `src` is larger than the remaining capacity of the buffer starting at
628    /// `offset`, a short write occurs: only the bytes that fit within the buffer
629    /// are written, and the returned value will be less than `src.len()`.
630    pub fn write_at(&mut self, mut offset: usize, src: &[u8]) -> usize {
631        let mut total = 0;
632
633        for slice in self.parts.iter_mut() {
634            let slice = slice.as_mut();
635            if offset < slice.len() {
636                let available = slice.len() - offset;
637                let to_copy = std::cmp::min(src.len() - total, available);
638                slice[offset..offset + to_copy].copy_from_slice(&src[total..total + to_copy]);
639                total += to_copy;
640                offset = 0;
641                if total == src.len() {
642                    break;
643                }
644            } else {
645                offset -= slice.len();
646            }
647        }
648        total
649    }
650}
651
652impl<T, K: AllocKind> BufferIO<T, K>
653where
654    T: AsRef<[u8]>,
655{
656    /// Reads data from the buffer starting at the specified `offset` into `dst`.
657    ///
658    /// This method is infallible. It returns the number of bytes successfully read.
659    ///
660    /// If the specified `offset` is greater than or equal to the total length of the
661    /// buffer, `0` bytes will be read.
662    ///
663    /// If the remaining data in the buffer starting at `offset` is less than the
664    /// size of `dst`, a short read occurs: only the available bytes are copied,
665    /// and the returned value will be less than `dst.len()`.
666    pub fn read_at(&self, mut offset: usize, dst: &mut [u8]) -> usize {
667        let mut total = 0;
668
669        for slice in self.parts.iter() {
670            let slice = slice.as_ref();
671            if offset < slice.len() {
672                let available = slice.len() - offset;
673                let to_copy = std::cmp::min(dst.len() - total, available);
674                dst[total..total + to_copy].copy_from_slice(&slice[offset..offset + to_copy]);
675                total += to_copy;
676                offset = 0;
677                if total == dst.len() {
678                    break;
679                }
680            } else {
681                offset -= slice.len();
682            }
683        }
684        total
685    }
686}
687
688impl AllocGuard<Rx> {
689    /// Turns a tx allocation into an rx one.
690    ///
691    /// To achieve this we have to convert the same amount of descriptors from
692    /// the tx pool to the rx pool to compensate for us being converted to tx
693    /// descriptors from rx ones.
694    async fn into_tx(mut self) -> AllocGuard<Tx> {
695        let mut tx = self.pool.alloc_tx(self.descs.len).await;
696        // [MaybeUninit<DescId<Tx>; 4] and [MaybeUninit<DescId<Rx>; 4] have the
697        // same memory layout because DescId is repr(transparent). So it is safe
698        // to transmute and swap the values between the storages. After the swap
699        // the drop implementation of self will return the descriptors back to
700        // rx pool.
701        std::mem::swap(&mut self.descs.storage, unsafe {
702            std::mem::transmute(&mut tx.descs.storage)
703        });
704        tx
705    }
706}
707
708/// A non-empty container that has at most [`netdev::MAX_DESCRIPTOR_CHAIN`] elements.
709struct Chained<T> {
710    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
711    len: ChainLength,
712}
713
714impl<T> Deref for Chained<T> {
715    type Target = [T];
716
717    fn deref(&self) -> &Self::Target {
718        // Safety: `self.storage[..self.len]` is already initialized.
719        unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
720    }
721}
722
723impl<T> DerefMut for Chained<T> {
724    fn deref_mut(&mut self) -> &mut Self::Target {
725        // Safety: `self.storage[..self.len]` is already initialized.
726        unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
727    }
728}
729
730impl<T> Drop for Chained<T> {
731    fn drop(&mut self) {
732        // Safety: `self.deref_mut()` is a slice of all initialized elements.
733        unsafe {
734            std::ptr::drop_in_place(self.deref_mut());
735        }
736    }
737}
738
739impl<T: Debug> Debug for Chained<T> {
740    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
741        f.debug_list().entries(self.iter()).finish()
742    }
743}
744
745impl<T> Chained<T> {
746    #[allow(clippy::uninit_assumed_init)]
747    fn empty() -> Self {
748        // Create an uninitialized array of `MaybeUninit`. The `assume_init` is
749        // safe because the type we are claiming to have initialized here is a
750        // bunch of `MaybeUninit`s, which do not require initialization.
751        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
752        // is stablized.
753        // https://doc.rust-lang.org/std/mem/union.MaybeUninit.html#method.uninit_array
754        Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
755    }
756}
757
758impl<T> FromIterator<T> for Chained<T> {
759    /// # Panics
760    ///
761    /// if the iterator is empty or the iterator can yield more than
762    ///  MAX_DESCRIPTOR_CHAIN elements.
763    fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
764        let mut result = Self::empty();
765        let mut len = 0u8;
766        for (idx, e) in elements.into_iter().enumerate() {
767            result.storage[idx] = MaybeUninit::new(e);
768            len += 1;
769        }
770        assert!(len > 0);
771        // `len` can not be larger than `MAX_DESCRIPTOR_CHAIN`, otherwise we can't
772        // get here due to the bound checks on `result.storage`.
773        result.len = ChainLength::try_from(len).unwrap();
774        result
775    }
776}
777
778impl<T> IntoIterator for Chained<T> {
779    type Item = T;
780    type IntoIter = ChainedIter<T>;
781
782    fn into_iter(mut self) -> Self::IntoIter {
783        let len = self.len;
784        self.len = ChainLength::ZERO;
785        // Safety: we have reset the length to zero, it is now safe to move out
786        // the values and set them to be uninitialized. The `assume_init` is
787        // safe because the type we are claiming to have initialized here is a
788        // bunch of `MaybeUninit`s, which do not require initialization.
789        // TODO(https://fxbug.dev/42160423): use MaybeUninit::uninit_array once it
790        // is stablized.
791        #[allow(clippy::uninit_assumed_init)]
792        let storage =
793            std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
794        ChainedIter { storage, len, consumed: 0 }
795    }
796}
797
798struct ChainedIter<T> {
799    storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
800    len: ChainLength,
801    consumed: u8,
802}
803
804impl<T> Iterator for ChainedIter<T> {
805    type Item = T;
806
807    fn next(&mut self) -> Option<Self::Item> {
808        if self.consumed < self.len.get() {
809            // Safety: it is safe now to replace that slot with an uninitialized
810            // value because we will advance consumed by 1.
811            let value = unsafe {
812                std::mem::replace(
813                    &mut self.storage[usize::from(self.consumed)],
814                    MaybeUninit::uninit(),
815                )
816                .assume_init()
817            };
818            self.consumed += 1;
819            Some(value)
820        } else {
821            None
822        }
823    }
824
825    fn size_hint(&self) -> (usize, Option<usize>) {
826        let len = usize::from(self.len.get() - self.consumed);
827        (len, Some(len))
828    }
829}
830
831impl<T> ExactSizeIterator for ChainedIter<T> {}
832
833impl<T> Drop for ChainedIter<T> {
834    fn drop(&mut self) {
835        // Safety: `self.storage[self.consumed..self.len]` is initialized.
836        unsafe {
837            std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
838                &mut self.storage[self.consumed.into()..self.len.into()],
839            ));
840        }
841    }
842}
843
844/// Guards the allocated descriptors; they will be freed when dropped.
845pub(in crate::session) struct AllocGuard<K: AllocKind> {
846    descs: Chained<DescId<K>>,
847    pool: Arc<Pool>,
848}
849
850impl<K: AllocKind> Debug for AllocGuard<K> {
851    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
852        let Self { descs, pool: _ } = self;
853        f.debug_struct("AllocGuard").field("descs", descs).finish()
854    }
855}
856
857impl<K: AllocKind> AllocGuard<K> {
858    fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
859        Self { descs, pool }
860    }
861
862    /// Iterates over references to the descriptors.
863    fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
864        self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
865    }
866
867    /// Iterates over mutable references to the descriptors.
868    fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
869        let descriptors = &self.pool.descriptors;
870        self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
871    }
872
873    /// Gets a reference to the head descriptor.
874    fn descriptor(&self) -> DescRef<'_, K> {
875        self.descriptors().next().expect("descriptors must not be empty")
876    }
877
878    /// Gets a mutable reference to the head descriptor.
879    fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
880        self.descriptors_mut().next().expect("descriptors must not be empty")
881    }
882}
883
884impl AllocGuard<Tx> {
885    /// Initializes descriptors of a tx allocation.
886    ///
887    /// We choose to enforce and satisfy the `min_tx_data` layout requirement
888    /// (imposed by the device/driver) immediately during buffer allocation and
889    /// initialization here.
890    ///
891    /// Consequently, the allocated buffer's capacity (`target_len`) may be
892    /// larger than the `requested_bytes` if `requested_bytes` is smaller than
893    /// `min_tx_data`.
894    ///
895    /// While this means we might spend CPU cycles zero-padding buffers that are
896    /// subsequently dropped without being sent (a rare occurrence in typical
897    /// usage), this guarantees that buffer is always suitable for sending. This
898    /// also makes the transmit path (`Session::send`) infallible.
899    fn init(&mut self, requested_bytes: usize) -> Result<()> {
900        let len = self.len();
901        let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data } =
902            self.pool.buffer_layout;
903
904        let target_len = requested_bytes.max(usize::from(min_tx_data));
905        let mut remaining_target = target_len;
906        let mut remaining_requested = requested_bytes;
907
908        for (desc_id, clen) in self.descs.iter_mut().zip((0..len).rev()) {
909            let chain_length = ChainLength::try_from(clen).unwrap();
910            let head_length = if clen + 1 == len { min_tx_head } else { 0 };
911            let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
912
913            // head_length and tail_length. The check was done when the config
914            // for pool was created, so the subtraction won't overflow.
915            let available_bytes =
916                u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
917                    .unwrap();
918
919            let data_length = match u32::try_from(remaining_target) {
920                Ok(target) => {
921                    if target < available_bytes {
922                        // The target bytes are less than what is available,
923                        // we need to put the excess in the tail so that the
924                        // user cannot write more than they requested (or padded).
925                        let excess = available_bytes - target;
926                        tail_length = u16::try_from(excess)
927                            .ok_checked::<TryFromIntError>()
928                            .and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
929                            .ok_or(Error::TxLength)?;
930                    }
931                    target.min(available_bytes)
932                }
933                Err(TryFromIntError { .. }) => available_bytes,
934            };
935
936            let data_length_usize = usize::try_from(data_length).expect("u32 must fit in a usize");
937            let requested_in_part = std::cmp::min(remaining_requested, data_length_usize);
938            let pad_in_part = data_length_usize - requested_in_part;
939
940            // Initialize the descriptor.
941            {
942                let mut descriptor = self.pool.descriptors.borrow_mut(desc_id);
943                descriptor.initialize(chain_length, head_length, data_length, tail_length);
944            }
945
946            // Zero-pad any excess capacity in this buffer part that was allocated
947            // to satisfy the `min_tx_data` layout requirement but not requested by
948            // the caller.
949            //
950            // We decided to pad the buffer on initialization because the lazy commit
951            // model can only avoid padding for the following 2 cases:
952            // 1) User only allocates but never sends.
953            // 2) User writes past their requested size and meets the min_tx_data
954            //    requirement.
955            // Both should be uncommon, and in case 2) we can fix the client by
956            // requesting a larger size to avoid padding.
957            if pad_in_part > 0 {
958                let slice = self.pool.get_slice_mut(desc_id);
959                slice[requested_in_part..requested_in_part + pad_in_part].fill(0);
960            }
961
962            remaining_target -= data_length_usize;
963            remaining_requested -= requested_in_part;
964        }
965        assert_eq!(remaining_target, 0);
966        Ok(())
967    }
968}
969
970impl<K: AllocKind> Drop for AllocGuard<K> {
971    fn drop(&mut self) {
972        if self.is_empty() {
973            return;
974        }
975        K::free(private::Allocation(self));
976    }
977}
978
979impl<K: AllocKind> Deref for AllocGuard<K> {
980    type Target = [DescId<K>];
981
982    fn deref(&self) -> &Self::Target {
983        self.descs.deref()
984    }
985}
986
987impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
988    fn from(alloc: AllocGuard<K>) -> Self {
989        Self { alloc }
990    }
991}
992
993impl<T, K: AllocKind> Read for BufferIO<T, K>
994where
995    T: AsRef<[u8]>,
996{
997    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
998        let read_len = self.read_at(self.pos, buf);
999        self.pos += read_len;
1000        Ok(read_len)
1001    }
1002}
1003
1004impl<T> Write for BufferIO<T, Tx>
1005where
1006    T: AsMut<[u8]>,
1007{
1008    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
1009        let write_len = self.write_at(self.pos, buf);
1010        self.pos += write_len;
1011        Ok(write_len)
1012    }
1013
1014    fn flush(&mut self) -> std::io::Result<()> {
1015        Ok(())
1016    }
1017}
1018
1019impl<T, K: AllocKind> Seek for BufferIO<T, K> {
1020    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
1021        let pos = match pos {
1022            SeekFrom::Start(offset) => offset,
1023            SeekFrom::End(offset) => {
1024                let end = i64::try_from(self.len).unwrap();
1025                u64::try_from(end.wrapping_add(offset)).unwrap()
1026            }
1027            SeekFrom::Current(offset) => {
1028                let current = i64::try_from(self.pos).map_err(|TryFromIntError { .. }| {
1029                    std::io::Error::from(std::io::ErrorKind::InvalidInput)
1030                })?;
1031                u64::try_from(current.wrapping_add(offset)).unwrap()
1032            }
1033        };
1034        self.pos = usize::try_from(pos).map_err(|TryFromIntError { .. }| {
1035            std::io::Error::from(std::io::ErrorKind::InvalidInput)
1036        })?;
1037        Ok(pos)
1038    }
1039}
1040
1041/// A pending tx allocation request.
1042struct TxAllocReq {
1043    sender: Sender<AllocGuard<Tx>>,
1044    size: ChainLength,
1045}
1046
1047impl TxAllocReq {
1048    fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
1049        let (sender, receiver) = channel();
1050        (TxAllocReq { sender, size }, receiver)
1051    }
1052
1053    /// Fulfills the pending request with an `AllocGuard`.
1054    ///
1055    /// If the request is already closed, the guard is simply dropped and
1056    /// returned to the queue.
1057    ///
1058    /// `fulfill` must *not* be called when the `guard`'s pool is holding the tx
1059    /// lock, since we may deadlock/panic upon the double tx lock acquisition.
1060    fn fulfill(self, guard: AllocGuard<Tx>) {
1061        let Self { sender, size: _ } = self;
1062        match sender.send(guard) {
1063            Ok(()) => (),
1064            Err(guard) => {
1065                // It's ok to just drop the guard here, it'll be returned to the
1066                // pool.
1067                drop(guard);
1068            }
1069        }
1070    }
1071}
1072
1073/// A module for sealed traits so that the user of this crate can not implement
1074/// [`AllocKind`] for anything than [`Rx`] and [`Tx`].
1075mod private {
1076    use super::{AllocKind, Rx, Tx};
1077    pub trait Sealed: 'static + Sized {}
1078    impl Sealed for Rx {}
1079    impl Sealed for Tx {}
1080
1081    // We can't leak a private type in a public trait, create an opaque private
1082    // new type for &mut super::AllocGuard so that we can mention it in the
1083    // AllocKind trait.
1084    pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
1085}
1086
1087/// An allocation can have two kinds, this trait provides a way to project a
1088/// type ([`Rx`] or [`Tx`]) into a value.
1089pub trait AllocKind: private::Sealed {
1090    /// The reflected value of Self.
1091    const REFL: AllocKindRefl;
1092
1093    /// frees an allocation of the given kind.
1094    fn free(alloc: private::Allocation<'_, Self>);
1095}
1096
1097/// A tag to related types for Tx allocations.
1098pub enum Tx {}
1099/// A tag to related types for Rx allocations.
1100pub enum Rx {}
1101
1102/// The reflected value that allows inspection on an [`AllocKind`] type.
1103pub enum AllocKindRefl {
1104    Tx,
1105    Rx,
1106}
1107
1108impl AllocKindRefl {
1109    pub(in crate::session) fn as_str(&self) -> &'static str {
1110        match self {
1111            AllocKindRefl::Tx => "Tx",
1112            AllocKindRefl::Rx => "Rx",
1113        }
1114    }
1115}
1116
1117impl AllocKind for Tx {
1118    const REFL: AllocKindRefl = AllocKindRefl::Tx;
1119
1120    fn free(alloc: private::Allocation<'_, Self>) {
1121        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1122        pool.free_tx(std::mem::replace(descs, Chained::empty()));
1123    }
1124}
1125
1126impl AllocKind for Rx {
1127    const REFL: AllocKindRefl = AllocKindRefl::Rx;
1128
1129    fn free(alloc: private::Allocation<'_, Self>) {
1130        let private::Allocation(AllocGuard { pool, descs }) = alloc;
1131        pool.free_rx(std::mem::replace(descs, Chained::empty()));
1132        pool.rx_leases.rx_complete();
1133    }
1134}
1135
1136/// An extracted struct containing state pertaining to watching rx leases.
1137pub(in crate::session) struct RxLeaseHandlingState {
1138    can_watch_rx_leases: AtomicBool,
1139    /// Keeps a rolling counter of received rx frames MINUS the target frame
1140    /// number of the current outstanding lease.
1141    ///
1142    /// When no leases are pending (via [`RxLeaseWatcher::wait_until`]),
1143    /// then this matches exactly the number of received frames.
1144    ///
1145    /// Otherwise, the lease is currently waiting for remaining `u64::MAX -
1146    /// rx_Frame_counter` frames. The logic depends on `AtomicU64` wrapping
1147    /// around as part of completing rx buffers.
1148    rx_frame_counter: AtomicU64,
1149    rx_lease_waker: AtomicWaker,
1150}
1151
1152impl RxLeaseHandlingState {
1153    fn new_with_flags(flags: netdev::SessionFlags) -> Self {
1154        Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
1155    }
1156
1157    fn new_with_enabled(enabled: bool) -> Self {
1158        Self {
1159            can_watch_rx_leases: AtomicBool::new(enabled),
1160            rx_frame_counter: AtomicU64::new(0),
1161            rx_lease_waker: AtomicWaker::new(),
1162        }
1163    }
1164
1165    /// Increments the total receive frame counter and possibly wakes up a
1166    /// waiting lease yielder.
1167    fn rx_complete(&self) {
1168        let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
1169        let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
1170
1171        // See wait_until for details. We need to hit a waker whenever our add
1172        // wrapped the u64 back around to 0.
1173        if prev == u64::MAX {
1174            rx_lease_waker.wake();
1175        }
1176    }
1177}
1178
1179/// A trait allowing [`RxLeaseWatcher`] to be agnostic over how to get an
1180/// [`RxLeaseHandlingState`].
1181pub(in crate::session) trait RxLeaseHandlingStateContainer {
1182    fn lease_handling_state(&self) -> &RxLeaseHandlingState;
1183}
1184
1185impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
1186    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1187        self.borrow()
1188    }
1189}
1190
1191impl RxLeaseHandlingStateContainer for Arc<Pool> {
1192    fn lease_handling_state(&self) -> &RxLeaseHandlingState {
1193        &self.rx_leases
1194    }
1195}
1196
1197/// A type safe-wrapper around a single lease watcher per `Pool`.
1198pub(in crate::session) struct RxLeaseWatcher<T> {
1199    state: T,
1200}
1201
1202impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
1203    /// Creates a new lease watcher.
1204    ///
1205    /// # Panics
1206    ///
1207    /// Panics if an [`RxLeaseWatcher`] has already been created for the given
1208    /// pool or the pool was not configured for it.
1209    pub(in crate::session) fn new(state: T) -> Self {
1210        assert!(
1211            state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
1212            "can't watch rx leases"
1213        );
1214        Self { state }
1215    }
1216
1217    /// Called by sessions to wait until `hold_until_frame` is fulfilled to
1218    /// yield leases out.
1219    ///
1220    /// Blocks until `hold_until_frame`-th rx buffer has been released.
1221    ///
1222    /// Note that this method takes `&mut self` because only one
1223    /// [`RxLeaseWatcher`] may be created by lease handling state, and exclusive
1224    /// access to it is required to watch lease completion.
1225    pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
1226        // A note about wrap-arounds.
1227        //
1228        // We're assuming the frame counter will never wrap around for
1229        // correctness here. This should be fine, even assuming a packet
1230        // rate of 1 million pps it'd take almost 600k years for this counter
1231        // to wrap around:
1232        // - 2^64 / 1e6 / 60 / 60 / 24 / 365 ~ 584e3.
1233
1234        let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
1235            self.state.lease_handling_state();
1236
1237        let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
1238        // After having subtracted the waiting value we *must always restore the
1239        // value* on return, even if the future is not polled to completion.
1240        let _guard = scopeguard::guard((), |()| {
1241            let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
1242        });
1243
1244        // Lease is ready to be fulfilled.
1245        if prev >= hold_until_frame {
1246            return;
1247        }
1248        // Threshold is a wrapped around subtraction. So now we must wait
1249        // until the read value from the atomic is LESS THAN the threshold.
1250        let threshold = prev.wrapping_sub(hold_until_frame);
1251        futures::future::poll_fn(|cx| {
1252            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1253            if v < threshold {
1254                return Poll::Ready(());
1255            }
1256            rx_lease_waker.register(cx.waker());
1257            let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
1258            if v < threshold {
1259                return Poll::Ready(());
1260            }
1261            Poll::Pending
1262        })
1263        .await;
1264    }
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269
1270    use super::*;
1271
1272    use assert_matches::assert_matches;
1273    use fuchsia_async as fasync;
1274    use futures::future::FutureExt;
1275    use test_case::test_case;
1276
1277    use std::collections::HashSet;
1278    use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
1279    use std::pin::pin;
1280    use std::task::{Poll, Waker};
1281
1282    const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
1283    const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
1284    // Safety: These are safe because none of the values are zero.
1285    const DEFAULT_BUFFER_LENGTH: NonZeroUsize = NonZeroUsize::new(64).unwrap();
1286    const DEFAULT_TX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1287    const DEFAULT_RX_BUFFERS: NonZeroU16 = NonZeroU16::new(8).unwrap();
1288    const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
1289        * netdev::MAX_DESCRIPTOR_CHAIN as usize
1290        - DEFAULT_MIN_TX_BUFFER_HEAD as usize
1291        - DEFAULT_MIN_TX_BUFFER_TAIL as usize;
1292
1293    const SENTINEL_BYTE: u8 = 0xab;
1294    const WRITE_BYTE: u8 = 1;
1295    const PAD_BYTE: u8 = 0;
1296
1297    const DEFAULT_CONFIG: Config = Config {
1298        buffer_stride: NonZeroU64::new(DEFAULT_BUFFER_LENGTH.get() as u64).unwrap(),
1299        num_rx_buffers: DEFAULT_RX_BUFFERS,
1300        num_tx_buffers: DEFAULT_TX_BUFFERS,
1301        options: netdev::SessionFlags::empty(),
1302        buffer_layout: BufferLayout {
1303            length: DEFAULT_BUFFER_LENGTH.get(),
1304            min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
1305            min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
1306            min_tx_data: 0,
1307        },
1308    };
1309
1310    impl Pool {
1311        fn new_test_default() -> Arc<Self> {
1312            let (pool, _descriptors, _data) =
1313                Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
1314            pool
1315        }
1316
1317        async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
1318            self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
1319                .await
1320        }
1321
1322        fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
1323            self.alloc_tx_checked(n).now_or_never()
1324        }
1325
1326        fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
1327            std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
1328        }
1329
1330        fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
1331            self.alloc_tx_buffer(num_bytes)
1332                .now_or_never()
1333                .transpose()
1334                .expect("invalid arguments for alloc_tx_buffer")
1335        }
1336
1337        fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
1338            Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
1339        }
1340
1341        fn fill_sentinel_bytes(&mut self) {
1342            // Safety: We have mut reference to Pool, so we get to modify the
1343            // VMO pointed by self.base.
1344            unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
1345        }
1346    }
1347
1348    impl Buffer<Tx> {
1349        // Write a byte at offset, the result buffer should be pad_size long, with
1350        // 0..offset being the SENTINEL_BYTE, offset being the WRITE_BYTE and the
1351        // rest being PAD_BYTE.
1352        fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
1353            {
1354                let mut io = self.io_mut();
1355                assert_eq!(io.write_at(offset, &[WRITE_BYTE][..]), 1);
1356            }
1357            assert_eq!(self.len(), pad_size);
1358            // An arbitrary value that is not SENTINAL/WRITE/PAD_BYTE so that
1359            // we can make sure the write really happened.
1360            const INIT_BYTE: u8 = 42;
1361            let mut read_buf = vec![INIT_BYTE; pad_size];
1362            assert_eq!(self.io().read_at(0, &mut read_buf[..]), read_buf.len());
1363            for (idx, byte) in read_buf.iter().enumerate() {
1364                if idx < offset {
1365                    assert_eq!(*byte, SENTINEL_BYTE);
1366                } else if idx == offset {
1367                    assert_eq!(*byte, WRITE_BYTE);
1368                } else {
1369                    assert_eq!(*byte, PAD_BYTE);
1370                }
1371            }
1372        }
1373    }
1374
1375    impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
1376    where
1377        K: AllocKind,
1378        I: ExactSizeIterator<Item = u16>,
1379        T: Copy + IntoIterator<IntoIter = I>,
1380    {
1381        fn eq(&self, other: &T) -> bool {
1382            let iter = other.into_iter();
1383            if usize::from(self.len) != iter.len() {
1384                return false;
1385            }
1386            self.iter().zip(iter).all(|(l, r)| l.get() == r)
1387        }
1388    }
1389
1390    impl Debug for TxAllocReq {
1391        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1392            let TxAllocReq { sender: _, size } = self;
1393            f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
1394        }
1395    }
1396
1397    #[test]
1398    fn alloc_tx_distinct() {
1399        let pool = Pool::new_test_default();
1400        let allocated = pool.alloc_tx_all(1);
1401        assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
1402        let distinct = allocated
1403            .iter()
1404            .map(|alloc| {
1405                assert_eq!(alloc.descs.len(), 1);
1406                alloc.descs[0].get()
1407            })
1408            .collect::<HashSet<u16>>();
1409        assert_eq!(allocated.len(), distinct.len());
1410    }
1411
1412    #[test]
1413    fn alloc_tx_free_len() {
1414        let pool = Pool::new_test_default();
1415        {
1416            let allocated = pool.alloc_tx_all(2);
1417            assert_eq!(
1418                allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
1419                DEFAULT_TX_BUFFERS.get().into()
1420            );
1421            assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
1422        }
1423        assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
1424    }
1425
1426    #[test]
1427    fn alloc_tx_chain() {
1428        let pool = Pool::new_test_default();
1429        let allocated = pool.alloc_tx_all(3);
1430        assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
1431        assert_matches!(pool.alloc_tx_now_or_never(3), None);
1432        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1433    }
1434
1435    #[test]
1436    fn alloc_tx_many() {
1437        let pool = Pool::new_test_default();
1438        let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1439            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1440            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1441        let data_len = usize::try_from(data_len).unwrap();
1442        let mut buffers = pool
1443            .alloc_tx_buffers(data_len)
1444            .now_or_never()
1445            .expect("failed to alloc")
1446            .unwrap()
1447            // Collect into a vec so we keep the buffers alive, otherwise they
1448            // are immediately returned to the pool.
1449            .collect::<Result<Vec<_>>>()
1450            .expect("buffer error");
1451        assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
1452
1453        // We have all the buffers, which means allocating more should not
1454        // resolve.
1455        assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
1456
1457        // If we release a single buffer we should be able to retrieve it again.
1458        assert_matches!(buffers.pop(), Some(_));
1459        let mut more_buffers =
1460            pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
1461        let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
1462        assert_matches!(more_buffers.next(), None);
1463        // The iterator is fused, so None is yielded even after dropping the
1464        // buffer.
1465        drop(buffer);
1466        assert_matches!(more_buffers.next(), None);
1467    }
1468
1469    #[test]
1470    fn alloc_tx_after_free() {
1471        let pool = Pool::new_test_default();
1472        let mut allocated = pool.alloc_tx_all(1);
1473        assert_matches!(pool.alloc_tx_now_or_never(2), None);
1474        {
1475            let _drained = allocated.drain(..2);
1476        }
1477        assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
1478    }
1479
1480    #[test]
1481    fn blocking_alloc_tx() {
1482        let mut executor = fasync::TestExecutor::new();
1483        let pool = Pool::new_test_default();
1484        let mut allocated = pool.alloc_tx_all(1);
1485        let alloc_fut = pool.alloc_tx_checked(1);
1486        let mut alloc_fut = pin!(alloc_fut);
1487        // The allocation should block.
1488        assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1489        // And the allocation request should be queued.
1490        assert!(!pool.tx_alloc_state.lock().requests.is_empty());
1491        let freed = allocated
1492            .pop()
1493            .expect("no fulfulled allocations")
1494            .iter()
1495            .map(|x| x.get())
1496            .collect::<Chained<_>>();
1497        let same_as_freed =
1498            |descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
1499        // Now the task should be able to continue.
1500        assert_matches!(
1501            &executor.run_until_stalled(&mut alloc_fut),
1502            Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
1503        );
1504        // And the queued request should now be removed.
1505        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1506    }
1507
1508    #[test]
1509    fn blocking_alloc_tx_cancel_before_free() {
1510        let mut executor = fasync::TestExecutor::new();
1511        let pool = Pool::new_test_default();
1512        let mut allocated = pool.alloc_tx_all(1);
1513        {
1514            let alloc_fut = pool.alloc_tx_checked(1);
1515            let mut alloc_fut = pin!(alloc_fut);
1516            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1517            assert_matches!(
1518                pool.tx_alloc_state.lock().requests.as_slices(),
1519                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1520            );
1521        }
1522        assert_matches!(
1523            allocated.pop(),
1524            Some(AllocGuard { ref descs, pool: ref p })
1525                if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1526        );
1527        let state = pool.tx_alloc_state.lock();
1528        assert_eq!(state.free_list.len, 1);
1529        assert!(state.requests.is_empty());
1530    }
1531
1532    #[test]
1533    fn blocking_alloc_tx_cancel_after_free() {
1534        let mut executor = fasync::TestExecutor::new();
1535        let pool = Pool::new_test_default();
1536        let mut allocated = pool.alloc_tx_all(1);
1537        {
1538            let alloc_fut = pool.alloc_tx_checked(1);
1539            let mut alloc_fut = pin!(alloc_fut);
1540            assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
1541            assert_matches!(
1542                pool.tx_alloc_state.lock().requests.as_slices(),
1543                (&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
1544            );
1545            assert_matches!(
1546                allocated.pop(),
1547                Some(AllocGuard { ref descs, pool: ref p })
1548                    if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
1549            );
1550        }
1551        let state = pool.tx_alloc_state.lock();
1552        assert_eq!(state.free_list.len, 1);
1553        assert!(state.requests.is_empty());
1554    }
1555
1556    #[test]
1557    fn multiple_blocking_alloc_tx_fulfill_order() {
1558        const TASKS_TOTAL: usize = 3;
1559        let mut executor = fasync::TestExecutor::new();
1560        let pool = Pool::new_test_default();
1561        let mut allocated = pool.alloc_tx_all(1);
1562        let mut alloc_futs = (1..=TASKS_TOTAL)
1563            .rev()
1564            .map(|x| {
1565                let pool = pool.clone();
1566                (x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
1567            })
1568            .collect::<Vec<_>>();
1569
1570        for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
1571            assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1572            // assert that the tasks are sorted decreasing on the requested size.
1573            assert_eq!(idx + *req_size, TASKS_TOTAL);
1574        }
1575        {
1576            let state = pool.tx_alloc_state.lock();
1577            // The first pending request was introduced by `alloc_tx_all`.
1578            assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
1579            let mut requests = state.requests.iter();
1580            // It should already be cancelled because the requesting future is
1581            // already dropped.
1582            assert!(requests.next().unwrap().sender.is_canceled());
1583            // The rest of the requests must not be cancelled.
1584            assert!(requests.all(|req| !req.sender.is_canceled()))
1585        }
1586
1587        let mut to_free = Vec::new();
1588        let mut freed = 0;
1589        for free_size in (1..=TASKS_TOTAL).rev() {
1590            let (_req_size, mut task) = alloc_futs.remove(0);
1591            for _ in 1..free_size {
1592                freed += 1;
1593                assert_matches!(
1594                    allocated.pop(),
1595                    Some(AllocGuard { ref descs, pool: ref p })
1596                        if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1597                );
1598                assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
1599            }
1600            freed += 1;
1601            assert_matches!(
1602                allocated.pop(),
1603                Some(AllocGuard { ref descs, pool: ref p })
1604                    if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
1605            );
1606            match executor.run_until_stalled(&mut task) {
1607                Poll::Ready(alloc) => {
1608                    assert_eq!(alloc.len(), free_size);
1609                    // Don't return the allocation to the pool now.
1610                    to_free.push(alloc);
1611                }
1612                Poll::Pending => panic!("The request should be fulfilled"),
1613            }
1614            // The rest of requests can not be fulfilled.
1615            for (_req_size, task) in alloc_futs.iter_mut() {
1616                assert_matches!(executor.run_until_stalled(task), Poll::Pending);
1617            }
1618        }
1619        assert!(pool.tx_alloc_state.lock().requests.is_empty());
1620    }
1621
1622    #[test]
1623    fn singleton_tx_layout() {
1624        let pool = Pool::new_test_default();
1625        let buffers = std::iter::from_fn(|| {
1626            let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1627                - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1628                - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1629            pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
1630                assert_eq!(buffer.alloc.descriptors().count(), 1);
1631                let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1632                    * u64::from(buffer.alloc[0].get());
1633                {
1634                    let descriptor = buffer.alloc.descriptor();
1635                    assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1636                    assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
1637                    assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
1638                    assert_eq!(descriptor.data_length(), data_len);
1639                    assert_eq!(descriptor.offset(), offset);
1640                }
1641
1642                {
1643                    let mut slices = buffer.parts();
1644                    let slice = slices.next().expect("should have one slice");
1645                    assert_matches!(slices.next(), None);
1646                    assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1647                    assert_eq!(
1648                        slice.as_ptr(),
1649                        pool.base.as_ptr().wrapping_add(
1650                            usize::try_from(offset).unwrap()
1651                                + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
1652                        )
1653                    );
1654                }
1655                buffer
1656            })
1657        })
1658        .collect::<Vec<_>>();
1659        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
1660    }
1661
1662    #[test]
1663    fn chained_tx_layout() {
1664        let pool = Pool::new_test_default();
1665        let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
1666            - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1667            - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1668        let buffers = std::iter::from_fn(|| {
1669            pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
1670                assert_eq!(buffer.parts().count(), 4);
1671                for (idx, (descriptor, slice)) in
1672                    buffer.alloc.descriptors().zip(buffer.parts()).enumerate()
1673                {
1674                    let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
1675                    let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
1676                    let tail_length = if chain_length == ChainLength::ZERO {
1677                        DEFAULT_MIN_TX_BUFFER_TAIL
1678                    } else {
1679                        0
1680                    };
1681                    let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1682                        - u32::from(head_length)
1683                        - u32::from(tail_length);
1684                    let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1685                        * u64::from(buffer.alloc[idx].get());
1686                    assert_eq!(descriptor.chain_length().unwrap(), chain_length);
1687                    assert_eq!(descriptor.head_length(), head_length);
1688                    assert_eq!(descriptor.tail_length(), tail_length);
1689                    assert_eq!(descriptor.offset(), offset);
1690                    assert_eq!(descriptor.data_length(), data_len);
1691                    if chain_length != ChainLength::ZERO {
1692                        assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
1693                    }
1694
1695                    assert_eq!(slice.len(), usize::try_from(data_len).unwrap());
1696                    assert_eq!(
1697                        slice.as_ptr(),
1698                        pool.base.as_ptr().wrapping_add(
1699                            usize::try_from(offset).unwrap() + usize::from(head_length),
1700                        )
1701                    );
1702                }
1703                buffer
1704            })
1705        })
1706        .collect::<Vec<_>>();
1707        assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
1708    }
1709
1710    #[test]
1711    fn rx_distinct() {
1712        let pool = Pool::new_test_default();
1713        let mut guard = pool.rx_pending.inner.lock();
1714        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1715        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1716        let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
1717        assert_eq!(descs.len(), distinct.len());
1718    }
1719
1720    #[test]
1721    fn alloc_rx_layout() {
1722        let pool = Pool::new_test_default();
1723        let mut guard = pool.rx_pending.inner.lock();
1724        let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
1725        assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
1726        for desc in descs.iter() {
1727            let descriptor = pool.descriptors.borrow(desc);
1728            let offset =
1729                u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
1730            assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
1731            assert_eq!(descriptor.head_length(), 0);
1732            assert_eq!(descriptor.tail_length(), 0);
1733            assert_eq!(descriptor.offset(), offset);
1734            assert_eq!(
1735                descriptor.data_length(),
1736                u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1737            );
1738        }
1739    }
1740
1741    #[test]
1742    fn buffer_read_at_write_at() {
1743        let pool = Pool::new_test_default();
1744        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1745        let mut buffer =
1746            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1747        // Because we have to accommodate the space for head and tail, there
1748        // would be 2 parts instead of 1.
1749        assert_eq!(buffer.parts().count(), 2);
1750        assert_eq!(buffer.len(), alloc_bytes);
1751        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1752        assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), write_buf.len());
1753        let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
1754        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), read_buf.len());
1755        for (idx, byte) in read_buf.iter().enumerate() {
1756            assert_eq!(*byte, write_buf[idx]);
1757        }
1758    }
1759
1760    #[test]
1761    fn buffer_write_at_short() {
1762        let pool = Pool::new_test_default();
1763        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1764        let mut buffer =
1765            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1766        assert_eq!(buffer.parts().count(), 2);
1767        assert_eq!(buffer.len(), alloc_bytes);
1768
1769        let write_buf = vec![WRITE_BYTE; alloc_bytes + 10];
1770
1771        // Test short write (writing more than buffer capacity)
1772        assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1773
1774        // Verify short write
1775        let mut read_buf = vec![0; alloc_bytes];
1776        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1777        for byte in read_buf.iter() {
1778            assert_eq!(*byte, WRITE_BYTE);
1779        }
1780
1781        // Test write with offset past end
1782        assert_eq!(buffer.io_mut().write_at(alloc_bytes + 1, &write_buf[..]), 0);
1783
1784        // Test write with offset inside buffer but src extending past end
1785        let offset = alloc_bytes / 2;
1786        let expected_write = alloc_bytes - offset;
1787        let write_buf = vec![2; alloc_bytes]; // Different byte to distinguish
1788        assert_eq!(buffer.io_mut().write_at(offset, &write_buf[..]), expected_write);
1789
1790        // Verify the write
1791        let mut read_buf = vec![0; alloc_bytes];
1792        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1793        for (idx, byte) in read_buf.iter().enumerate() {
1794            if idx < offset {
1795                assert_eq!(*byte, WRITE_BYTE);
1796            } else {
1797                assert_eq!(*byte, 2);
1798            }
1799        }
1800    }
1801
1802    #[test]
1803    fn buffer_read_at_short() {
1804        let pool = Pool::new_test_default();
1805        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1806        let mut buffer =
1807            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1808        assert_eq!(buffer.parts().count(), 2);
1809        assert_eq!(buffer.len(), alloc_bytes);
1810
1811        let write_buf = vec![WRITE_BYTE; alloc_bytes];
1812        assert_eq!(buffer.io_mut().write_at(0, &write_buf[..]), alloc_bytes);
1813
1814        // Test short read (reading more than buffer capacity)
1815        let mut read_buf = vec![0xff; alloc_bytes + 10];
1816        assert_eq!(buffer.io().read_at(0, &mut read_buf[..]), alloc_bytes);
1817        for (idx, byte) in read_buf.iter().enumerate() {
1818            if idx < alloc_bytes {
1819                assert_eq!(*byte, WRITE_BYTE);
1820            } else {
1821                assert_eq!(*byte, 0xff);
1822            }
1823        }
1824
1825        // Test read with offset past end
1826        assert_eq!(buffer.io().read_at(alloc_bytes + 1, &mut read_buf[..]), 0);
1827
1828        // Test read with offset inside buffer but dst extending past end
1829        let offset = alloc_bytes / 2;
1830        let expected_read = alloc_bytes - offset;
1831        let mut read_buf = vec![0xff; alloc_bytes];
1832        assert_eq!(buffer.io().read_at(offset, &mut read_buf[..]), expected_read);
1833        for (idx, byte) in read_buf.iter().enumerate() {
1834            if idx < expected_read {
1835                assert_eq!(*byte, WRITE_BYTE);
1836            } else {
1837                assert_eq!(*byte, 0xff);
1838            }
1839        }
1840    }
1841
1842    #[test]
1843    fn buffer_read_write_seek() {
1844        let pool = Pool::new_test_default();
1845        let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
1846        let mut buffer =
1847            pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
1848        // Because we have to accommodate the space for head and tail, there
1849        // would be 2 parts instead of 1.
1850        assert_eq!(buffer.parts().count(), 2);
1851        assert_eq!(buffer.len(), alloc_bytes);
1852        let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
1853
1854        let mut io = buffer.io_mut();
1855
1856        assert_eq!(io.write(&write_buf[..]).expect("failed to write into buffer"), write_buf.len());
1857        const SEEK_FROM_END: usize = 64;
1858        const READ_LEN: usize = 12;
1859        assert_eq!(
1860            io.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
1861            u64::try_from(io.len - SEEK_FROM_END).unwrap()
1862        );
1863        let mut read_buf = [0xff; READ_LEN];
1864        assert_eq!(io.read(&mut read_buf[..]).expect("failed to read from buffer"), read_buf.len());
1865        assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
1866    }
1867
1868    #[test_case(32; "single buffer part")]
1869    #[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
1870    fn buffer_pad(pad_size: usize) {
1871        let mut pool = Pool::new_test_default();
1872        pool.set_min_tx_buffer_length(pad_size);
1873        for offset in 0..pad_size {
1874            Arc::get_mut(&mut pool)
1875                .expect("there are multiple owners of the underlying VMO")
1876                .fill_sentinel_bytes();
1877            let mut buffer =
1878                pool.alloc_tx_buffer_now_or_never(offset + 1).expect("failed to allocate buffer");
1879            buffer.check_write_and_pad(offset, pad_size);
1880        }
1881    }
1882
1883    #[test]
1884    fn buffer_pad_grow() {
1885        const BUFFER_PARTS: u8 = 3;
1886        let mut pool = Pool::new_test_default();
1887        let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
1888            * u32::from(BUFFER_PARTS)
1889            - u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1890            - u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
1891        pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
1892        for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
1893            Arc::get_mut(&mut pool)
1894                .expect("there are multiple owners of the underlying VMO")
1895                .fill_sentinel_bytes();
1896            let mut alloc =
1897                pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
1898            alloc
1899                .init(usize::try_from(offset).unwrap() + 1)
1900                .expect("head/body/tail sizes are representable with u16/u32/u16");
1901            let mut buffer = Buffer::try_from(alloc).unwrap();
1902            buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
1903        }
1904    }
1905
1906    #[test_case(  0; "writes at the beginning")]
1907    #[test_case( 15; "writes in the first part")]
1908    #[test_case( 75; "writes in the second part")]
1909    #[test_case(135; "writes in the third part")]
1910    #[test_case(195; "writes in the last part")]
1911    fn buffer_used(write_offset: usize) {
1912        let pool = Pool::new_test_default();
1913        let mut buffer =
1914            pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
1915        let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
1916            if i == 0 {
1917                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
1918            } else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
1919                DEFAULT_BUFFER_LENGTH.get()
1920            } else {
1921                DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
1922            }
1923        });
1924        assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
1925        assert_eq!(buffer.io_mut().write_at(write_offset, &[WRITE_BYTE][..]), 1);
1926        // The accumulator is Some if we haven't found the part where the byte
1927        // was written, None if we've already found it.
1928        assert_eq!(
1929            buffer.parts().zip(expected_caps).fold(
1930                Some(write_offset),
1931                |offset, (slice, expected_cap)| {
1932                    assert_eq!(slice.len(), expected_cap);
1933                    match offset {
1934                        Some(offset) => {
1935                            if offset >= expected_cap {
1936                                Some(offset - slice.len())
1937                            } else {
1938                                assert_eq!(slice[offset], WRITE_BYTE);
1939                                None
1940                            }
1941                        }
1942                        None => None,
1943                    }
1944                }
1945            ),
1946            None
1947        );
1948    }
1949
1950    #[test]
1951    fn allocate_under_device_minimum() {
1952        const MIN_TX_DATA: usize = 32;
1953        const ALLOC_SIZE: usize = 16;
1954        const WRITE_BYTE: u8 = 0xff;
1955        const WRITE_SENTINAL_BYTE: u8 = 0xee;
1956        const READ_SENTINAL_BYTE: u8 = 0xdd;
1957        let mut config = DEFAULT_CONFIG;
1958        config.buffer_layout.min_tx_data = MIN_TX_DATA;
1959        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
1960        for mut buffer in Vec::from_iter(std::iter::from_fn({
1961            let pool = pool.clone();
1962            move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
1963        })) {
1964            assert_eq!(
1965                buffer.io_mut().write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]),
1966                MIN_TX_DATA
1967            );
1968        }
1969        let mut allocated =
1970            pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
1971        assert_eq!(allocated.len(), MIN_TX_DATA);
1972        const WRITE_BUF_SIZE: usize = MIN_TX_DATA + 1;
1973        assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]), MIN_TX_DATA);
1974        assert_eq!(allocated.io_mut().write_at(0, &[WRITE_BYTE; ALLOC_SIZE]), ALLOC_SIZE);
1975        assert_eq!(allocated.len(), MIN_TX_DATA);
1976        const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
1977        let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
1978        assert_eq!(allocated.io().read_at(0, &mut read_buf[..]), MIN_TX_DATA);
1979        assert_eq!(allocated.io().read_at(0, &mut read_buf[..MIN_TX_DATA]), MIN_TX_DATA);
1980        assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
1981        assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[WRITE_BYTE; ALLOC_SIZE][..]);
1982        assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
1983    }
1984
1985    #[test]
1986    fn invalid_tx_length() {
1987        let mut config = DEFAULT_CONFIG;
1988        config.buffer_layout.length = usize::from(u16::MAX) + 2;
1989        config.buffer_layout.min_tx_head = 0;
1990        let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
1991        assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
1992    }
1993
1994    #[test]
1995    fn rx_leases() {
1996        let mut executor = fuchsia_async::TestExecutor::new();
1997        let state = RxLeaseHandlingState::new_with_enabled(true);
1998        let mut watcher = RxLeaseWatcher { state: &state };
1999
2000        {
2001            let mut fut = pin!(watcher.wait_until(0));
2002            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2003        }
2004        {
2005            state.rx_complete();
2006            let mut fut = pin!(watcher.wait_until(1));
2007            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2008        }
2009        {
2010            let mut fut = pin!(watcher.wait_until(0));
2011            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2012        }
2013        {
2014            let mut fut = pin!(watcher.wait_until(3));
2015            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2016            state.rx_complete();
2017            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2018            state.rx_complete();
2019            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
2020        }
2021        // Dropping the wait future without seeing it complete restores the
2022        // value.
2023        let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2024        {
2025            let mut fut = pin!(watcher.wait_until(10000));
2026            assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
2027        }
2028        let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
2029        assert_eq!(counter_before, counter_after);
2030    }
2031}