netdevice_client/session/
mod.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice session.
6
7mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::atomic::{self, AtomicUsize};
15use std::sync::Arc;
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network::DelegatedRxLease;
20use fidl_table_validation::ValidFidlTable;
21use fuchsia_sync::Mutex;
22use futures::future::{poll_fn, Future};
23use futures::task::{Context, Poll};
24use futures::{ready, Stream};
25use {fidl_fuchsia_hardware_network as netdev, fuchsia_async as fasync};
26
27use crate::error::{Error, Result};
28use buffer::pool::{Pool, RxLeaseWatcher};
29use buffer::{
30    AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
31};
32pub use buffer::{Buffer, Rx, Tx};
33
34/// A session between network device client and driver.
35#[derive(Clone)]
36pub struct Session {
37    inner: Arc<Inner>,
38}
39
40impl Debug for Session {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        let Self { inner } = self;
43        let Inner {
44            name,
45            pool: _,
46            proxy: _,
47            rx: _,
48            tx: _,
49            tx_pending: _,
50            rx_ready: _,
51            tx_ready: _,
52            tx_idle_listeners: _,
53        } = &**inner;
54        f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
55    }
56}
57
58impl Session {
59    /// Creates a new session with the given `name` and `config`.
60    pub async fn new(
61        device: &netdev::DeviceProxy,
62        name: &str,
63        config: Config,
64    ) -> Result<(Self, Task)> {
65        let inner = Inner::new(device, name, config).await?;
66        Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
67    }
68
69    /// Sends a [`Buffer`] to the network device in this session.
70    pub fn send(&self, buffer: Buffer<Tx>) -> Result<()> {
71        self.inner.send(buffer)
72    }
73
74    /// Receives a [`Buffer`] from the network device in this session.
75    pub async fn recv(&self) -> Result<Buffer<Rx>> {
76        self.inner.recv().await
77    }
78
79    /// Allocates a [`Buffer`] that may later be queued to the network device.
80    ///
81    /// The returned buffer will have at least `num_bytes` as size.
82    pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
83        self.inner.pool.alloc_tx_buffer(num_bytes).await
84    }
85
86    /// Waits for at least one TX buffer to be available and returns an iterator
87    /// of buffers with `num_bytes` as capacity.
88    ///
89    /// The returned iterator is guaranteed to yield at least one item (though
90    /// it might be an error if the requested size cannot meet the device
91    /// requirement).
92    ///
93    /// # Note
94    ///
95    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
96    /// returned iterator will seemingly yield infinite items if the yielded
97    /// `Buffer`s are dropped while iterating.
98    pub async fn alloc_tx_buffers(
99        &self,
100        num_bytes: usize,
101    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
102        self.inner.pool.alloc_tx_buffers(num_bytes).await
103    }
104
105    /// Attaches [`Session`] to a port.
106    pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
107        // NB: Need to bind the future returned by `proxy.attach` to a variable
108        // otherwise this function's (`Session::attach`) returned future becomes
109        // not `Send` and we get unexpected compiler errors at a distance.
110        //
111        // The dyn borrow in the signature of `proxy.attach` seems to be the
112        // cause of the compiler's confusion.
113        let fut = self.inner.proxy.attach(&port.into(), rx_frames);
114        let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
115        Ok(())
116    }
117
118    /// Detaches a port from the [`Session`].
119    pub async fn detach(&self, port: Port) -> Result<()> {
120        let () = self
121            .inner
122            .proxy
123            .detach(&port.into())
124            .await?
125            .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
126        Ok(())
127    }
128
129    /// Blocks until there are no more tx buffers in flight to the backing
130    /// device.
131    ///
132    /// Note that this method does not prevent new buffers from being allocated
133    /// and sent, it is up to the caller to prevent any races. This future will
134    /// resolve as soon as it observes a tx idle event. That is, there are no
135    /// frames in flight to the backing device at all and the session currently
136    /// owns all allocated tx buffers.
137    ///
138    /// The synchronization guarantee provided by this method is that any
139    /// buffers previously given to [`Session::send`] will be accounted as
140    /// pending until the device has replied back.
141    pub async fn wait_tx_idle(&self) {
142        self.inner.tx_idle_listeners.wait().await;
143    }
144
145    /// Returns a stream of delegated rx leases from the device.
146    ///
147    /// Leases are yielded from the stream whenever the corresponding receive
148    /// buffer is dropped or reused for tx, which marks the end of processing
149    /// the marked buffer for the delegated lease.
150    ///
151    /// See [`fidl_fuchsia_hardware_network::DelegatedRxLease`] for more
152    /// details.
153    ///
154    /// # Panics
155    ///
156    /// Panics if the session was not created with
157    /// [`fidl_fuchsia_hardware_network::SessionFlags::RECEIVE_RX_POWER_LEASES`]
158    /// or if `watch_rx_leases` has already been called for this session.
159    pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync {
160        let inner = Arc::clone(&self.inner);
161        let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
162        futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
163            let DelegatedRxLease {
164                hold_until_frame,
165                handle,
166                __source_breaking: fidl::marker::SourceBreaking,
167            } = match inner.proxy.watch_delegated_rx_lease().await {
168                Ok(lease) => lease,
169                Err(e) => {
170                    if e.is_closed() {
171                        return Ok(None);
172                    } else {
173                        return Err(Error::Fidl(e));
174                    }
175                }
176            };
177            let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
178            let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
179
180            watcher.wait_until(hold_until_frame).await;
181            Ok(Some((handle, (inner, watcher))))
182        })
183    }
184}
185
186struct Inner {
187    pool: Arc<Pool>,
188    proxy: netdev::SessionProxy,
189    name: String,
190    rx: fasync::Fifo<DescId<Rx>>,
191    tx: fasync::Fifo<DescId<Tx>>,
192    // Pending tx descriptors to be sent.
193    tx_pending: Pending<Tx>,
194    rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
195    tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
196    tx_idle_listeners: TxIdleListeners,
197}
198
199impl Inner {
200    /// Creates a new session.
201    async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
202        let (pool, descriptors, data) = Pool::new(config)?;
203
204        let session_info = {
205            // The following two constants are not provided by user, panic
206            // instead of returning an error.
207            let descriptor_length =
208                u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
209                    .expect("descriptor length in 64-bit words not representable by u8");
210            netdev::SessionInfo {
211                descriptors: Some(descriptors),
212                data: Some(data),
213                descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
214                descriptor_length: Some(descriptor_length),
215                descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
216                options: Some(config.options),
217                ..Default::default()
218            }
219        };
220
221        let (client, netdev::Fifos { rx, tx }) = device
222            .open_session(name, session_info)
223            .await?
224            .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
225        let proxy = client.into_proxy();
226        let rx = fasync::Fifo::from_fifo(rx);
227        let tx = fasync::Fifo::from_fifo(tx);
228
229        Ok(Arc::new(Self {
230            pool,
231            proxy,
232            name: name.to_owned(),
233            rx,
234            tx,
235            tx_pending: Pending::new(Vec::new()),
236            rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
237            tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
238            tx_idle_listeners: TxIdleListeners::new(),
239        }))
240    }
241
242    /// Polls to submit available rx descriptors from pool to driver.
243    ///
244    /// Returns the number of rx descriptors that are submitted.
245    fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
246        self.pool.rx_pending.poll_submit(&self.rx, cx)
247    }
248
249    /// Polls completed rx descriptors from the driver.
250    ///
251    /// Returns the the head of a completed rx descriptor chain.
252    fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
253        let mut rx_ready = self.rx_ready.lock();
254        rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
255    }
256
257    /// Polls to submit tx descriptors that are pending to the driver.
258    ///
259    /// Returns the number of tx descriptors that are successfully submitted.
260    fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
261        self.tx_pending.poll_submit(&self.tx, cx)
262    }
263
264    /// Polls completed tx descriptors from the driver then puts them in pool.
265    fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
266        let result = {
267            let mut tx_ready = self.tx_ready.lock();
268            // TODO(https://github.com/rust-lang/rust/issues/63569): Provide entire
269            // chain of completed descriptors to the pool at once when slice of
270            // MaybeUninit is stabilized.
271            tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
272                Ok(desc) => self.pool.tx_completed(desc),
273                Err(status) => Err(Error::Fifo("read", "tx", status)),
274            })
275        };
276
277        match &result {
278            Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
279            Poll::Pending | Poll::Ready(Err(_)) => {}
280        }
281        result
282    }
283
284    /// Sends the [`Buffer`] to the driver.
285    fn send(&self, mut buffer: Buffer<Tx>) -> Result<()> {
286        buffer.pad()?;
287        buffer.commit();
288        self.tx_idle_listeners.tx_started();
289        self.tx_pending.extend(std::iter::once(buffer.leak()));
290        Ok(())
291    }
292
293    /// Receives a [`Buffer`] from the driver.
294    ///
295    /// Waits until there is completed rx buffers from the driver.
296    async fn recv(&self) -> Result<Buffer<Rx>> {
297        poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
298            let head = ready!(self.poll_complete_rx(cx))?;
299            Poll::Ready(self.pool.rx_completed(head))
300        })
301        .await
302    }
303}
304
305/// The backing task that drives the session.
306///
307/// A session will stop making progress if this task is not polled continuously.
308#[must_use = "futures do nothing unless you `.await` or poll them"]
309pub struct Task {
310    inner: Arc<Inner>,
311}
312
313impl Future for Task {
314    type Output = Result<()>;
315    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
316        let inner = &Pin::into_inner(self).inner;
317        loop {
318            let mut all_pending = true;
319            // TODO(https://fxbug.dev/42158458): poll once for all completed
320            // descriptors if this becomes a performance bottleneck.
321            while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
322                all_pending = false;
323            }
324            if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
325                all_pending = false;
326            }
327            if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
328                all_pending = false;
329            }
330            if all_pending {
331                return Poll::Pending;
332            }
333        }
334    }
335}
336
337/// Session configuration.
338#[derive(Debug, Clone, Copy)]
339pub struct Config {
340    /// Buffer stride on VMO, in bytes.
341    buffer_stride: NonZeroU64,
342    /// Number of rx descriptors to allocate.
343    num_rx_buffers: NonZeroU16,
344    /// Number of tx descriptors to allocate.
345    num_tx_buffers: NonZeroU16,
346    /// Session flags.
347    options: netdev::SessionFlags,
348    /// Buffer layout.
349    buffer_layout: BufferLayout,
350}
351
352/// Describes the buffer layout that [`Pool`] needs to know.
353#[derive(Debug, Clone, Copy)]
354struct BufferLayout {
355    /// Minimum tx buffer data length.
356    min_tx_data: usize,
357    /// Minimum tx buffer head length.
358    min_tx_head: u16,
359    /// Minimum tx buffer tail length.
360    min_tx_tail: u16,
361    /// The length of a buffer.
362    length: usize,
363}
364
365/// Network device base info with all required fields.
366#[derive(Debug, Clone, ValidFidlTable)]
367#[fidl_table_src(netdev::DeviceBaseInfo)]
368#[fidl_table_strict]
369pub struct DeviceBaseInfo {
370    /// Maximum number of items in rx FIFO (per session).
371    pub rx_depth: u16,
372    /// Maximum number of items in tx FIFO (per session).
373    pub tx_depth: u16,
374    /// Alignment requirement for buffers in the data VMO.
375    pub buffer_alignment: u32,
376    /// Maximum supported length of buffers in the data VMO, in bytes.
377    #[fidl_field_type(optional)]
378    pub max_buffer_length: Option<NonZeroU32>,
379    /// The minimum rx buffer length required for device.
380    pub min_rx_buffer_length: u32,
381    /// The minimum tx buffer length required for the device.
382    pub min_tx_buffer_length: u32,
383    /// The number of bytes the device requests be free as `head` space in a tx buffer.
384    pub min_tx_buffer_head: u16,
385    /// The amount of bytes the device requests be free as `tail` space in a tx buffer.
386    pub min_tx_buffer_tail: u16,
387    /// Maximum descriptor chain length accepted by the device.
388    pub max_buffer_parts: u8,
389    /// Available rx acceleration flags for this device.
390    #[fidl_field_type(default)]
391    pub rx_accel: Vec<netdev::RxAcceleration>,
392    /// Available tx acceleration flags for this device.
393    #[fidl_field_type(default)]
394    pub tx_accel: Vec<netdev::TxAcceleration>,
395}
396
397/// Network device information with all required fields.
398#[derive(Debug, Clone, ValidFidlTable)]
399#[fidl_table_src(netdev::DeviceInfo)]
400#[fidl_table_strict]
401pub struct DeviceInfo {
402    /// Minimum descriptor length, in 64-bit words.
403    pub min_descriptor_length: u8,
404    /// Accepted descriptor version.
405    pub descriptor_version: u8,
406    /// Device base info.
407    pub base_info: DeviceBaseInfo,
408}
409
410/// Basic session configuration that can be given to [`DeviceInfo`] to generate
411/// [`Config`]s.
412#[derive(Debug, Copy, Clone)]
413pub struct DerivableConfig {
414    /// The desired default buffer length for the session.
415    pub default_buffer_length: usize,
416    /// Create a primary session.
417    pub primary: bool,
418    /// Enable rx lease watching.
419    pub watch_rx_leases: bool,
420}
421
422impl DerivableConfig {
423    /// A sensibly common default buffer length to be used in
424    /// [`DerivableConfig`]. Provided to ease test writing.
425    ///
426    /// Chosen to be the next power of two after the default Ethernet MTU.
427    ///
428    /// This is the value of the buffer length in the `Default` impl.
429    pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
430    /// The value returned by the `Default` impl.
431    pub const DEFAULT: Self = Self {
432        default_buffer_length: Self::DEFAULT_BUFFER_LENGTH,
433        primary: true,
434        watch_rx_leases: false,
435    };
436}
437
438impl Default for DerivableConfig {
439    fn default() -> Self {
440        Self::DEFAULT
441    }
442}
443
444impl DeviceInfo {
445    /// Create a new session config from the device information.
446    ///
447    /// This method also does the boundary checks so that data_length/offset fields read
448    /// from descriptors are safe to convert to [`usize`].
449    pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
450        let DeviceInfo {
451            min_descriptor_length,
452            descriptor_version,
453            base_info:
454                DeviceBaseInfo {
455                    rx_depth,
456                    tx_depth,
457                    buffer_alignment,
458                    max_buffer_length,
459                    min_rx_buffer_length,
460                    min_tx_buffer_length,
461                    min_tx_buffer_head,
462                    min_tx_buffer_tail,
463                    max_buffer_parts: _,
464                    rx_accel: _,
465                    tx_accel: _,
466                },
467        } = self;
468        if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
469            return Err(Error::Config(format!(
470                "descriptor version mismatch: {} != {}",
471                NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
472            )));
473        }
474        if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
475            return Err(Error::Config(format!(
476                "descriptor length too small: {} < {}",
477                NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
478            )));
479        }
480
481        let DerivableConfig { default_buffer_length, primary, watch_rx_leases } = config;
482
483        let num_rx_buffers =
484            NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
485        let num_tx_buffers =
486            NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
487
488        let max_buffer_length = max_buffer_length
489            .and_then(|max| {
490                // The error case is the case where max_buffer_length can't fix in a
491                // usize, but we use it to compare it to usizes, so that's
492                // equivalent to no limit.
493                usize::try_from(max.get()).ok_checked::<TryFromIntError>()
494            })
495            .unwrap_or(usize::MAX);
496        let min_buffer_length = usize::try_from(*min_rx_buffer_length)
497            .ok_checked::<TryFromIntError>()
498            .unwrap_or(usize::MAX);
499
500        let buffer_length =
501            usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
502
503        let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
504            |std::num::TryFromIntError { .. }| {
505                Error::Config(format!(
506                    "buffer_alignment not representable within usize: {}",
507                    buffer_alignment,
508                ))
509            },
510        )?;
511
512        let buffer_stride = buffer_length
513            .checked_add(buffer_alignment - 1)
514            .map(|x| x / buffer_alignment * buffer_alignment)
515            .ok_or_else(|| {
516                Error::Config(format!(
517                    "not possible to align {} to {} under usize::MAX",
518                    buffer_length, buffer_alignment,
519                ))
520            })?;
521
522        if buffer_stride < buffer_length {
523            return Err(Error::Config(format!(
524                "buffer stride too small {} < {}",
525                buffer_stride, buffer_length
526            )));
527        }
528
529        if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
530            return Err(Error::Config(format!(
531                "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
532                buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
533            )));
534        }
535
536        let num_buffers =
537            rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
538                Error::Config(format!(
539                    "too many buffers requested: {} + {} > u16::MAX",
540                    rx_depth, tx_depth
541                ))
542            })?;
543
544        let buffer_stride =
545            u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
546                Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
547            })?;
548
549        // This is following the practice of rust stdlib to ensure allocation
550        // size never reaches isize::MAX.
551        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
552        match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
553            None | Some(Err(std::num::TryFromIntError { .. })) => {
554                return Err(Error::Config(format!(
555                    "too much memory required for the buffers: {} * {} > isize::MAX",
556                    buffer_stride, num_buffers
557                )))
558            }
559            Some(Ok(_total)) => (),
560        };
561
562        let buffer_stride = NonZeroU64::new(buffer_stride)
563            .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
564
565        let min_tx_data = match usize::try_from(*min_tx_buffer_length)
566            .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
567        {
568            Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
569            // Either the conversion or the comparison failed.
570            Ok(None) | Err(std::num::TryFromIntError { .. }) => {
571                return Err(Error::Config(format!(
572                    "buffer_length smaller than minimum TX requirement: {} < {}",
573                    buffer_length, *min_tx_buffer_length
574                )));
575            }
576        };
577
578        let mut options = netdev::SessionFlags::empty();
579        options.set(netdev::SessionFlags::PRIMARY, primary);
580        options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
581
582        Ok(Config {
583            buffer_stride,
584            num_rx_buffers,
585            num_tx_buffers,
586            options,
587            buffer_layout: BufferLayout {
588                length: buffer_length,
589                min_tx_head: *min_tx_buffer_head,
590                min_tx_tail: *min_tx_buffer_tail,
591                min_tx_data,
592            },
593        })
594    }
595}
596
597/// A port of the device.
598#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
599pub struct Port {
600    pub(crate) base: u8,
601    pub(crate) salt: u8,
602}
603
604impl TryFrom<netdev::PortId> for Port {
605    type Error = Error;
606    fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
607        if base <= netdev::MAX_PORTS {
608            Ok(Self { base, salt })
609        } else {
610            Err(Error::InvalidPortId(base))
611        }
612    }
613}
614
615impl From<Port> for netdev::PortId {
616    fn from(Port { base, salt }: Port) -> Self {
617        Self { base, salt }
618    }
619}
620
621/// Pending descriptors to be sent to driver.
622struct Pending<K: AllocKind> {
623    inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
624}
625
626impl<K: AllocKind> Pending<K> {
627    fn new(descs: Vec<DescId<K>>) -> Self {
628        Self { inner: Mutex::new((descs, None)) }
629    }
630
631    /// Extends the pending descriptors buffer.
632    fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
633        let mut guard = self.inner.lock();
634        let (storage, waker) = &mut *guard;
635        storage.extend(descs);
636        if let Some(waker) = waker.take() {
637            waker.wake();
638        }
639    }
640
641    /// Submits the pending buffer to the driver through [`zx::Fifo`].
642    ///
643    /// It will return [`Poll::Pending`] if any of the following happens:
644    ///   - There are no descriptors pending.
645    ///   - The fifo is not ready for write.
646    fn poll_submit(
647        &self,
648        fifo: &fasync::Fifo<DescId<K>>,
649        cx: &mut Context<'_>,
650    ) -> Poll<Result<usize>> {
651        let mut guard = self.inner.lock();
652        let (storage, waker) = &mut *guard;
653        if storage.is_empty() {
654            *waker = Some(cx.waker().clone());
655            return Poll::Pending;
656        }
657
658        // TODO(https://fxbug.dev/42107145): We're assuming that writing to the
659        // FIFO here is a sufficient memory barrier for the other end to access
660        // the data. That is currently true but not really guaranteed by the
661        // API.
662        let submitted = ready!(fifo.try_write(cx, &storage[..]))
663            .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
664        let _drained = storage.drain(0..submitted);
665        Poll::Ready(Ok(submitted))
666    }
667}
668
669/// An intermediary buffer used to reduce syscall overhead by acting as a proxy
670/// to read entries from a FIFO.
671///
672/// `ReadyBuffer` caches read entries from a FIFO in pre-allocated memory,
673/// allowing different batch sizes between what is acquired from the FIFO and
674/// what's processed by the caller.
675struct ReadyBuffer<T> {
676    // NB: A vector of `MaybeUninit` here allows us to give a transparent memory
677    // layout to the FIFO object but still move objects out of our buffer
678    // without needing a `T: Default` implementation. There's a small added
679    // benefit of not paying for memory initialization on creation as well, but
680    // that's mostly negligible given all allocation is performed upfront.
681    data: Vec<MaybeUninit<T>>,
682    available: Range<usize>,
683}
684
685impl<T> Drop for ReadyBuffer<T> {
686    fn drop(&mut self) {
687        let Self { data, available } = self;
688        for initialized in &mut data[available.clone()] {
689            // SAFETY: the available range keeps track of initialized buffers,
690            // we must drop them on drop to uphold `MaybeUninit` expectations.
691            unsafe { initialized.assume_init_drop() }
692        }
693        *available = 0..0;
694    }
695}
696
697impl<T> ReadyBuffer<T> {
698    fn new(capacity: usize) -> Self {
699        let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
700        Self { data, available: 0..0 }
701    }
702
703    fn poll_with_fifo(
704        &mut self,
705        cx: &mut Context<'_>,
706        fifo: &fuchsia_async::Fifo<T>,
707    ) -> Poll<std::result::Result<T, zx::Status>>
708    where
709        T: fasync::FifoEntry,
710    {
711        let Self { data, available: Range { start, end } } = self;
712
713        loop {
714            // Always pop from available data first.
715            if *start != *end {
716                let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
717                *start += 1;
718                // SAFETY: Descriptor was in the initialized section, it was
719                // initialized.
720                let desc = unsafe { desc.assume_init() };
721                return Poll::Ready(Ok(desc));
722            }
723            // Fetch more from the FIFO.
724            let count = ready!(fifo.try_read(cx, &mut data[..]))?;
725            *start = 0;
726            *end = count;
727        }
728    }
729}
730
731struct TxIdleListeners {
732    event: event_listener::Event,
733    tx_in_flight: AtomicUsize,
734}
735
736impl TxIdleListeners {
737    fn new() -> Self {
738        Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
739    }
740
741    /// Decreases the number of outstanding tx buffers by 1.
742    ///
743    /// Notifies any tx idle listeners if the number reaches 0.
744    fn tx_complete(&self) {
745        let Self { event, tx_in_flight } = self;
746        let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
747        debug_assert_ne!(old_value, 0);
748        if old_value == 1 {
749            let _notified: usize = event.notify(usize::MAX);
750        }
751    }
752
753    /// Increases the number of outstanding tx buffers by 1.
754    fn tx_started(&self) {
755        let Self { event: _, tx_in_flight } = self;
756        let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
757    }
758
759    async fn wait(&self) {
760        let Self { event, tx_in_flight } = self;
761        // This is _the correct way_ of holding an `event_listener::Listener`.
762        // We check the condition before installing the listener in the fast
763        // case, then we must check the condition again after creating the
764        // listener in case we've raced with the condition updating. Finally we
765        // must loop and check the condition again because we're not fully
766        // guaranteed to not have spurious wakeups.
767        //
768        // See the event_listener crate documentation for more details.
769        loop {
770            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
771                return;
772            }
773
774            event_listener::listener!(event => listener);
775
776            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
777                return;
778            }
779
780            listener.await;
781        }
782    }
783}
784
785/// An RAII lease possibly keeping the system from suspension.
786///
787/// Yielded from [`Session::watch_rx_leases`].
788///
789/// Dropping an `RxLease` relinquishes it.
790#[derive(Debug)]
791pub struct RxLease {
792    handle: netdev::DelegatedRxLeaseHandle,
793}
794
795impl Drop for RxLease {
796    fn drop(&mut self) {
797        let Self { handle } = self;
798        // Change detector in case we need any evolution on how to relinquish
799        // leases.
800        match handle {
801            netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
802                // Dropping the channel is enough to relinquish the lease.
803            }
804            netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
805        }
806    }
807}
808
809impl RxLease {
810    /// Peeks the internal lease.
811    pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
812        &self.handle
813    }
814}
815
816#[cfg(test)]
817mod tests {
818    use std::num::NonZeroU32;
819    use std::ops::Deref;
820    use std::sync::Arc;
821    use std::task::Poll;
822
823    use assert_matches::assert_matches;
824    use fuchsia_async::Fifo;
825    use test_case::test_case;
826    use zerocopy::{FromBytes, Immutable, IntoBytes};
827    use zx::{AsHandleRef as _, HandleBased as _};
828
829    use crate::session::DerivableConfig;
830
831    use super::buffer::{
832        AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
833    };
834    use super::{
835        BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
836        ReadyBuffer, Task, TxIdleListeners,
837    };
838
839    const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
840        rx_depth: 1,
841        tx_depth: 1,
842        buffer_alignment: 1,
843        max_buffer_length: None,
844        min_rx_buffer_length: 0,
845        min_tx_buffer_head: 0,
846        min_tx_buffer_length: 0,
847        min_tx_buffer_tail: 0,
848        max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
849        rx_accel: Vec::new(),
850        tx_accel: Vec::new(),
851    };
852
853    const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
854        min_descriptor_length: 0,
855        descriptor_version: 1,
856        base_info: DEFAULT_DEVICE_BASE_INFO,
857    };
858
859    const DEFAULT_BUFFER_LENGTH: usize = 2048;
860
861    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
862        min_descriptor_length: u8::MAX,
863        ..DEFAULT_DEVICE_INFO
864    }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
865    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
866        descriptor_version: 42,
867        ..DEFAULT_DEVICE_INFO
868    }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
869    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
870        base_info: DeviceBaseInfo {
871            tx_depth: 0,
872            ..DEFAULT_DEVICE_BASE_INFO
873        },
874        ..DEFAULT_DEVICE_INFO
875    }, "no TX buffers")]
876    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
877        base_info: DeviceBaseInfo {
878            rx_depth: 0,
879            ..DEFAULT_DEVICE_BASE_INFO
880        },
881        ..DEFAULT_DEVICE_INFO
882    }, "no RX buffers")]
883    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
884        base_info: DeviceBaseInfo {
885            tx_depth: u16::MAX,
886            rx_depth: u16::MAX,
887            ..DEFAULT_DEVICE_BASE_INFO
888        },
889        ..DEFAULT_DEVICE_INFO
890    }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
891    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
892        base_info: DeviceBaseInfo {
893            min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
894            ..DEFAULT_DEVICE_BASE_INFO
895        },
896        ..DEFAULT_DEVICE_INFO
897    }, format!(
898        "buffer_length smaller than minimum TX requirement: {} < {}",
899        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
900    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
901        base_info: DeviceBaseInfo {
902            min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
903            ..DEFAULT_DEVICE_BASE_INFO
904        },
905        ..DEFAULT_DEVICE_INFO
906    }, format!(
907        "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
908        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
909    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
910        base_info: DeviceBaseInfo {
911            min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
912            ..DEFAULT_DEVICE_BASE_INFO
913        },
914        ..DEFAULT_DEVICE_INFO
915    }, format!(
916        "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
917        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
918    #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
919    #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
920    format!(
921        "too much memory required for the buffers: {} * {} > isize::MAX",
922        usize::MAX, 2))]
923    #[test_case(usize::MAX, DeviceInfo {
924        base_info: DeviceBaseInfo {
925            buffer_alignment: 2,
926            ..DEFAULT_DEVICE_BASE_INFO
927        },
928        ..DEFAULT_DEVICE_INFO
929    }, format!(
930        "not possible to align {} to {} under usize::MAX",
931        usize::MAX, 2))]
932    fn configs_from_device_info_err(
933        buffer_length: usize,
934        info: DeviceInfo,
935        expected: impl Deref<Target = str>,
936    ) {
937        let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
938        assert_matches!(
939            info.make_config(config),
940            Err(Error::Config(got)) if got.as_str() == expected.deref()
941        );
942    }
943
944    #[test_case(DeviceInfo {
945        base_info: DeviceBaseInfo {
946            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
947            ..DEFAULT_DEVICE_BASE_INFO
948        },
949        ..DEFAULT_DEVICE_INFO
950    }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
951    #[test_case(DeviceInfo {
952        base_info: DeviceBaseInfo {
953            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
954            ..DEFAULT_DEVICE_BASE_INFO
955        },
956        ..DEFAULT_DEVICE_INFO
957    }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
958    #[test_case(DeviceInfo {
959        base_info: DeviceBaseInfo {
960            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
961            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
962            ..DEFAULT_DEVICE_BASE_INFO
963        },
964        ..DEFAULT_DEVICE_INFO
965    }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
966    fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
967        let config = info
968            .make_config(DerivableConfig {
969                default_buffer_length: DEFAULT_BUFFER_LENGTH,
970                ..Default::default()
971            })
972            .expect("is valid");
973        let Config {
974            buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
975            buffer_stride: _,
976            num_rx_buffers: _,
977            num_tx_buffers: _,
978            options: _,
979        } = config;
980        assert_eq!(length, expected_length);
981    }
982
983    fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
984        let (handle, other_end) = zx::Fifo::create(1).unwrap();
985        (Fifo::from_fifo(handle), other_end)
986    }
987
988    fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
989        fifo: Fifo<T>,
990        rights_to_remove: zx::Rights,
991    ) -> Fifo<T> {
992        let fifo = zx::Fifo::from(fifo);
993        let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
994
995        let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
996        Fifo::from_fifo(fifo)
997    }
998
999    enum TxOrRx {
1000        Tx,
1001        Rx,
1002    }
1003    #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1004    #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1005    #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1006    #[fuchsia_async::run_singlethreaded(test)]
1007    async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1008        // This is a regression test for https://fxbug.dev/42072513. The flake
1009        // that caused that bug occurred because the Zircon channel was closed
1010        // but the error returned by a failed attempt to write to it wasn't
1011        // being propagated upwards. This test produces a similar situation by
1012        // altering the right on the FIFOs the task uses so as to cause either
1013        // an attempt to write or to read to fail. For completeness, it
1014        // exercises all the FIFO polls that comprise Task::poll.
1015        let (pool, _descriptors, _data) = Pool::new(
1016            DEFAULT_DEVICE_INFO
1017                .make_config(DerivableConfig {
1018                    default_buffer_length: DEFAULT_BUFFER_LENGTH,
1019                    ..Default::default()
1020                })
1021                .expect("is valid"),
1022        )
1023        .expect("is valid");
1024        let (session_proxy, _session_server) =
1025            fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1026
1027        let (rx, _rx_sender) = make_fifos();
1028        let (tx, _tx_receiver) = make_fifos();
1029
1030        // Attenuate rights on one of the FIFOs.
1031        let (tx, rx) = match which_fifo {
1032            TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1033            TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1034        };
1035
1036        let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1037        let inner = Arc::new(Inner {
1038            pool,
1039            proxy: session_proxy,
1040            name: "fake_task".to_string(),
1041            rx,
1042            tx,
1043            tx_pending: Pending::new(vec![]),
1044            rx_ready: Mutex::new(ReadyBuffer::new(10)),
1045            tx_ready: Mutex::new(ReadyBuffer::new(10)),
1046            tx_idle_listeners: TxIdleListeners::new(),
1047        });
1048
1049        inner.send(buf).expect("can send");
1050
1051        let mut task = Task { inner };
1052
1053        // The task should not be able to continue because it can't read from or
1054        // write to one of the FIFOs.
1055        assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1056    }
1057}