Skip to main content

netdevice_client/session/
mod.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice session.
6
7mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, StreamExt as _, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31    AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, SinglePartTxBuffer, Tx};
34
35/// A session between network device client and driver.
36#[derive(Clone)]
37pub struct Session {
38    inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        let Self { inner } = self;
44        let Inner {
45            name,
46            pool: _,
47            proxy: _,
48            rx: _,
49            tx: _,
50            tx_pending: _,
51            rx_ready: _,
52            tx_ready: _,
53            tx_idle_listeners: _,
54        } = &**inner;
55        f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56    }
57}
58
59impl Session {
60    /// Creates a new session with the given `name` and `config`.
61    pub async fn new(
62        device: &netdev::DeviceProxy,
63        name: &str,
64        config: Config,
65    ) -> Result<(Self, Task)> {
66        let inner = Inner::new(device, name, config).await?;
67        Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68    }
69
70    /// Sends a [`Buffer`] to the network device in this session.
71    pub fn send(&self, buffer: Buffer<Tx>) {
72        self.inner.send(buffer)
73    }
74
75    /// Receives a [`Buffer`] from the network device in this session.
76    pub async fn recv(&self) -> Result<Buffer<Rx>> {
77        self.inner.recv().await
78    }
79
80    /// Allocates a [`Buffer`] that may later be queued to the network device.
81    ///
82    /// The returned buffer will have at least `num_bytes` as size.
83    pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84        self.inner.pool.alloc_tx_buffer(num_bytes).await
85    }
86
87    /// Tries to allocate a [`SinglePartTxBuffer`].
88    ///
89    /// Returns `Ok(None)` if there is no available buffer, or `Err(Error::TxLength)`
90    /// if the requested size cannot meet the device requirement.
91    pub fn try_alloc_single_part_tx_buffer(
92        &self,
93        num_bytes: usize,
94    ) -> Result<Option<SinglePartTxBuffer>> {
95        self.inner.pool.try_alloc_single_part_tx_buffer(num_bytes)
96    }
97
98    /// Waits for at least one TX buffer to be available and returns an iterator
99    /// of buffers with `num_bytes` as capacity.
100    ///
101    /// The returned iterator is guaranteed to yield at least one item (though
102    /// it might be an error if the requested size cannot meet the device
103    /// requirement).
104    ///
105    /// # Note
106    ///
107    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
108    /// returned iterator will seemingly yield infinite items if the yielded
109    /// `Buffer`s are dropped while iterating.
110    pub async fn alloc_tx_buffers(
111        &self,
112        num_bytes: usize,
113    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
114        self.inner.pool.alloc_tx_buffers(num_bytes).await
115    }
116
117    /// Attaches [`Session`] to a port.
118    pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
119        // NB: Need to bind the future returned by `proxy.attach` to a variable
120        // otherwise this function's (`Session::attach`) returned future becomes
121        // not `Send` and we get unexpected compiler errors at a distance.
122        //
123        // The dyn borrow in the signature of `proxy.attach` seems to be the
124        // cause of the compiler's confusion.
125        let fut = self.inner.proxy.attach(&port.into(), rx_frames);
126        let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
127        Ok(())
128    }
129
130    /// Detaches a port from the [`Session`].
131    pub async fn detach(&self, port: Port) -> Result<()> {
132        let () = self
133            .inner
134            .proxy
135            .detach(&port.into())
136            .await?
137            .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
138        Ok(())
139    }
140
141    /// Blocks until there are no more tx buffers in flight to the backing
142    /// device.
143    ///
144    /// Note that this method does not prevent new buffers from being allocated
145    /// and sent, it is up to the caller to prevent any races. This future will
146    /// resolve as soon as it observes a tx idle event. That is, there are no
147    /// frames in flight to the backing device at all and the session currently
148    /// owns all allocated tx buffers.
149    ///
150    /// The synchronization guarantee provided by this method is that any
151    /// buffers previously given to [`Session::send`] will be accounted as
152    /// pending until the device has replied back.
153    pub async fn wait_tx_idle(&self) {
154        self.inner.tx_idle_listeners.wait().await;
155    }
156
157    /// Returns a stream of delegated rx leases from the device.
158    ///
159    /// Leases are yielded from the stream whenever the corresponding receive
160    /// buffer is dropped or reused for tx, which marks the end of processing
161    /// the marked buffer for the delegated lease.
162    ///
163    /// See [`fidl_fuchsia_hardware_network::DelegatedRxLease`] for more
164    /// details.
165    ///
166    /// # Panics
167    ///
168    /// Panics if the session was not created with
169    /// [`fidl_fuchsia_hardware_network::SessionFlags::RECEIVE_RX_POWER_LEASES`]
170    /// or if `watch_rx_leases` has already been called for this session.
171    pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
172        let inner = Arc::clone(&self.inner);
173        let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
174        futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
175            let DelegatedRxLease {
176                hold_until_frame,
177                handle,
178                __source_breaking: fidl::marker::SourceBreaking,
179            } = match inner.proxy.watch_delegated_rx_lease().await {
180                Ok(lease) => lease,
181                Err(e) => {
182                    if e.is_closed() {
183                        return Ok(None);
184                    } else {
185                        return Err(Error::Fidl(e));
186                    }
187                }
188            };
189            let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
190            let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
191
192            watcher.wait_until(hold_until_frame).await;
193            Ok(Some((handle, (inner, watcher))))
194        })
195    }
196
197    /// Closes the session.
198    pub async fn close(&self) -> Result<()> {
199        self.inner.proxy.close()?;
200        // Synchronize by waiting for the channel to be closed.
201        let mut event_stream = self.inner.proxy.take_event_stream();
202        while let Some(event) = event_stream.next().await {
203            match event {
204                Err(fidl::Error::ClientChannelClosed { .. }) => break,
205                Ok(_) => {} // Ignore other events.
206                Err(e) => return Err(Error::Fidl(e)),
207            }
208        }
209        Ok(())
210    }
211}
212
213struct Inner {
214    pool: Arc<Pool>,
215    proxy: netdev::SessionProxy,
216    name: String,
217    rx: fasync::Fifo<DescId<Rx>>,
218    tx: fasync::Fifo<DescId<Tx>>,
219    // Pending tx descriptors to be sent.
220    tx_pending: Pending<Tx>,
221    rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
222    tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
223    tx_idle_listeners: TxIdleListeners,
224}
225
226impl Inner {
227    /// Creates a new session.
228    async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
229        let (pool, descriptors, data) = Pool::new(config)?;
230
231        let session_info = {
232            // The following two constants are not provided by user, panic
233            // instead of returning an error.
234            let descriptor_length =
235                u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
236                    .expect("descriptor length in 64-bit words not representable by u8");
237            let data = vec![fidl_fuchsia_hardware_network::DataVmo {
238                id: Some(0),
239                vmo: Some(data),
240                num_rx_buffers: Some(config.num_rx_buffers.get()),
241                __source_breaking: fidl::marker::SourceBreaking,
242            }];
243            netdev::SessionInfo {
244                descriptors: Some(descriptors),
245                data: Some(data),
246                descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
247                descriptor_length: Some(descriptor_length),
248                descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
249                options: Some(config.options),
250                ..Default::default()
251            }
252        };
253
254        let (client, netdev::Fifos { rx, tx }) = device
255            .open_session(name, session_info)
256            .await?
257            .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
258        let proxy = client.into_proxy();
259        let rx = fasync::Fifo::from_fifo(rx);
260        let tx = fasync::Fifo::from_fifo(tx);
261
262        Ok(Arc::new(Self {
263            pool,
264            proxy,
265            name: name.to_owned(),
266            rx,
267            tx,
268            tx_pending: Pending::new(Vec::new()),
269            rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
270            tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
271            tx_idle_listeners: TxIdleListeners::new(),
272        }))
273    }
274
275    /// Polls to submit available rx descriptors from pool to driver.
276    ///
277    /// Returns the number of rx descriptors that are submitted.
278    fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
279        self.pool.rx_pending.poll_submit(&self.rx, cx)
280    }
281
282    /// Polls completed rx descriptors from the driver.
283    ///
284    /// Returns the the head of a completed rx descriptor chain.
285    fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
286        let mut rx_ready = self.rx_ready.lock();
287        rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
288    }
289
290    /// Polls to submit tx descriptors that are pending to the driver.
291    ///
292    /// Returns the number of tx descriptors that are successfully submitted.
293    fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
294        self.tx_pending.poll_submit(&self.tx, cx)
295    }
296
297    /// Polls completed tx descriptors from the driver then puts them in pool.
298    fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
299        let result = {
300            let mut tx_ready = self.tx_ready.lock();
301            // TODO(https://github.com/rust-lang/rust/issues/63569): Provide entire
302            // chain of completed descriptors to the pool at once when slice of
303            // MaybeUninit is stabilized.
304            tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
305                Ok(desc) => self.pool.tx_completed(desc),
306                Err(status) => Err(Error::Fifo("read", "tx", status)),
307            })
308        };
309
310        match &result {
311            Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
312            Poll::Pending | Poll::Ready(Err(_)) => {}
313        }
314        result
315    }
316
317    /// Sends the [`Buffer`] to the driver.
318    ///
319    /// Note: Transmit is completely infallible because the buffer layout and
320    /// zero-padding are already fully resolved and verified upfront during
321    /// buffer allocation (see `AllocGuard::init` in `pool.rs` for details
322    /// and design tradeoffs).
323    fn send(&self, buffer: Buffer<Tx>) {
324        self.tx_idle_listeners.tx_started();
325        self.tx_pending.extend(std::iter::once(buffer.leak()));
326    }
327
328    /// Receives a [`Buffer`] from the driver.
329    ///
330    /// Waits until there is completed rx buffers from the driver.
331    async fn recv(&self) -> Result<Buffer<Rx>> {
332        poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
333            let head = ready!(self.poll_complete_rx(cx))?;
334            Poll::Ready(self.pool.rx_completed(head))
335        })
336        .await
337    }
338}
339
340/// The backing task that drives the session.
341///
342/// A session will stop making progress if this task is not polled continuously.
343#[must_use = "futures do nothing unless you `.await` or poll them"]
344pub struct Task {
345    inner: Arc<Inner>,
346}
347
348impl Future for Task {
349    type Output = Result<()>;
350    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351        let inner = &Pin::into_inner(self).inner;
352        loop {
353            let mut all_pending = true;
354            // TODO(https://fxbug.dev/42158458): poll once for all completed
355            // descriptors if this becomes a performance bottleneck.
356            while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
357                all_pending = false;
358            }
359            if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
360                all_pending = false;
361            }
362            if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
363                all_pending = false;
364            }
365            if all_pending {
366                return Poll::Pending;
367            }
368        }
369    }
370}
371
372/// Session configuration.
373#[derive(Debug, Clone, Copy)]
374pub struct Config {
375    /// Buffer stride on VMO, in bytes.
376    buffer_stride: NonZeroU64,
377    /// Number of rx descriptors to allocate.
378    num_rx_buffers: NonZeroU16,
379    /// Number of tx descriptors to allocate.
380    num_tx_buffers: NonZeroU16,
381    /// Session flags.
382    options: netdev::SessionFlags,
383    /// Buffer layout.
384    buffer_layout: BufferLayout,
385}
386
387/// Describes the buffer layout that [`Pool`] needs to know.
388#[derive(Debug, Clone, Copy)]
389struct BufferLayout {
390    /// Minimum tx buffer data length.
391    min_tx_data: usize,
392    /// Minimum tx buffer head length.
393    min_tx_head: u16,
394    /// Minimum tx buffer tail length.
395    min_tx_tail: u16,
396    /// The length of a buffer.
397    length: usize,
398}
399
400/// Network device base info with all required fields.
401#[derive(Debug, Clone, ValidFidlTable)]
402#[fidl_table_src(netdev::DeviceBaseInfo)]
403#[fidl_table_strict]
404pub struct DeviceBaseInfo {
405    /// Maximum number of items in rx FIFO (per session).
406    pub rx_depth: u16,
407    /// Maximum number of items in tx FIFO (per session).
408    pub tx_depth: u16,
409    /// Alignment requirement for buffers in the data VMO.
410    pub buffer_alignment: u32,
411    /// Maximum supported length of buffers in the data VMO, in bytes.
412    #[fidl_field_type(optional)]
413    pub max_buffer_length: Option<NonZeroU32>,
414    /// The minimum rx buffer length required for device.
415    pub min_rx_buffer_length: u32,
416    /// The minimum tx buffer length required for the device.
417    pub min_tx_buffer_length: u32,
418    /// The number of bytes the device requests be free as `head` space in a tx buffer.
419    pub min_tx_buffer_head: u16,
420    /// The amount of bytes the device requests be free as `tail` space in a tx buffer.
421    pub min_tx_buffer_tail: u16,
422    /// Maximum descriptor chain length accepted by the device.
423    pub max_buffer_parts: u8,
424    /// Minimum amount of Rx buffers the client needs to prepare for the
425    /// network device.
426    #[fidl_field_type(optional)]
427    pub min_rx_buffers: Option<NonZeroU16>,
428    /// Available rx acceleration flags for this device.
429    #[fidl_field_type(default)]
430    pub rx_accel: Vec<netdev::RxAcceleration>,
431    /// Available tx acceleration flags for this device.
432    #[fidl_field_type(default)]
433    pub tx_accel: Vec<netdev::TxAcceleration>,
434}
435
436/// Network device information with all required fields.
437#[derive(Debug, Clone, ValidFidlTable)]
438#[fidl_table_src(netdev::DeviceInfo)]
439#[fidl_table_strict]
440pub struct DeviceInfo {
441    /// Minimum descriptor length, in 64-bit words.
442    pub min_descriptor_length: u8,
443    /// Accepted descriptor version.
444    pub descriptor_version: u8,
445    /// Device base info.
446    pub base_info: DeviceBaseInfo,
447}
448
449/// Basic session configuration that can be given to [`DeviceInfo`] to generate
450/// [`Config`]s.
451#[derive(Debug, Copy, Clone)]
452pub struct DerivableConfig {
453    /// The desired default buffer length for the session.
454    pub default_buffer_length: usize,
455    /// Enable rx lease watching.
456    pub watch_rx_leases: bool,
457}
458
459impl DerivableConfig {
460    /// A sensibly common default buffer length to be used in
461    /// [`DerivableConfig`]. Provided to ease test writing.
462    ///
463    /// Chosen to be the next power of two after the default Ethernet MTU.
464    ///
465    /// This is the value of the buffer length in the `Default` impl.
466    pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
467    /// The value returned by the `Default` impl.
468    pub const DEFAULT: Self =
469        Self { default_buffer_length: Self::DEFAULT_BUFFER_LENGTH, watch_rx_leases: false };
470}
471
472impl Default for DerivableConfig {
473    fn default() -> Self {
474        Self::DEFAULT
475    }
476}
477
478impl DeviceInfo {
479    /// Create a new session config from the device information.
480    ///
481    /// This method also does the boundary checks so that data_length/offset fields read
482    /// from descriptors are safe to convert to [`usize`].
483    pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
484        let DeviceInfo {
485            min_descriptor_length,
486            descriptor_version,
487            base_info:
488                DeviceBaseInfo {
489                    rx_depth,
490                    tx_depth,
491                    buffer_alignment,
492                    max_buffer_length,
493                    min_rx_buffer_length,
494                    min_tx_buffer_length,
495                    min_tx_buffer_head,
496                    min_tx_buffer_tail,
497                    max_buffer_parts: _,
498                    min_rx_buffers: _,
499                    rx_accel: _,
500                    tx_accel: _,
501                },
502        } = self;
503        if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
504            return Err(Error::Config(format!(
505                "descriptor version mismatch: {} != {}",
506                NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
507            )));
508        }
509        if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
510            return Err(Error::Config(format!(
511                "descriptor length too small: {} < {}",
512                NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
513            )));
514        }
515
516        let DerivableConfig { default_buffer_length, watch_rx_leases } = config;
517
518        let num_rx_buffers =
519            NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
520        let num_tx_buffers =
521            NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
522
523        let max_buffer_length = max_buffer_length
524            .and_then(|max| {
525                // The error case is the case where max_buffer_length can't fix in a
526                // usize, but we use it to compare it to usizes, so that's
527                // equivalent to no limit.
528                usize::try_from(max.get()).ok_checked::<TryFromIntError>()
529            })
530            .unwrap_or(usize::MAX);
531        let min_buffer_length = usize::try_from(*min_rx_buffer_length)
532            .ok_checked::<TryFromIntError>()
533            .unwrap_or(usize::MAX);
534
535        let buffer_length =
536            usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
537
538        let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
539            |std::num::TryFromIntError { .. }| {
540                Error::Config(format!(
541                    "buffer_alignment not representable within usize: {}",
542                    buffer_alignment,
543                ))
544            },
545        )?;
546
547        let buffer_stride = buffer_length
548            .checked_add(buffer_alignment - 1)
549            .map(|x| x / buffer_alignment * buffer_alignment)
550            .ok_or_else(|| {
551                Error::Config(format!(
552                    "not possible to align {} to {} under usize::MAX",
553                    buffer_length, buffer_alignment,
554                ))
555            })?;
556
557        if buffer_stride < buffer_length {
558            return Err(Error::Config(format!(
559                "buffer stride too small {} < {}",
560                buffer_stride, buffer_length
561            )));
562        }
563
564        if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
565            return Err(Error::Config(format!(
566                "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
567                buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
568            )));
569        }
570
571        let num_buffers =
572            rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
573                Error::Config(format!(
574                    "too many buffers requested: {} + {} > u16::MAX",
575                    rx_depth, tx_depth
576                ))
577            })?;
578
579        let buffer_stride =
580            u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
581                Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
582            })?;
583
584        // This is following the practice of rust stdlib to ensure allocation
585        // size never reaches isize::MAX.
586        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
587        match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
588            None | Some(Err(std::num::TryFromIntError { .. })) => {
589                return Err(Error::Config(format!(
590                    "too much memory required for the buffers: {} * {} > isize::MAX",
591                    buffer_stride, num_buffers
592                )));
593            }
594            Some(Ok(_total)) => (),
595        };
596
597        let buffer_stride = NonZeroU64::new(buffer_stride)
598            .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
599
600        let min_tx_data = match usize::try_from(*min_tx_buffer_length)
601            .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
602        {
603            Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
604            // Either the conversion or the comparison failed.
605            Ok(None) | Err(std::num::TryFromIntError { .. }) => {
606                return Err(Error::Config(format!(
607                    "buffer_length smaller than minimum TX requirement: {} < {}",
608                    buffer_length, *min_tx_buffer_length
609                )));
610            }
611        };
612
613        let mut options = netdev::SessionFlags::empty();
614        options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
615
616        Ok(Config {
617            buffer_stride,
618            num_rx_buffers,
619            num_tx_buffers,
620            options,
621            buffer_layout: BufferLayout {
622                length: buffer_length,
623                min_tx_head: *min_tx_buffer_head,
624                min_tx_tail: *min_tx_buffer_tail,
625                min_tx_data,
626            },
627        })
628    }
629}
630
631/// A port of the device.
632#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
633pub struct Port {
634    pub(crate) base: u8,
635    pub(crate) salt: u8,
636}
637
638impl TryFrom<netdev::PortId> for Port {
639    type Error = Error;
640    fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
641        if base <= netdev::MAX_PORTS {
642            Ok(Self { base, salt })
643        } else {
644            Err(Error::InvalidPortId(base))
645        }
646    }
647}
648
649impl From<Port> for netdev::PortId {
650    fn from(Port { base, salt }: Port) -> Self {
651        Self { base, salt }
652    }
653}
654
655/// Pending descriptors to be sent to driver.
656struct Pending<K: AllocKind> {
657    inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
658}
659
660impl<K: AllocKind> Pending<K> {
661    fn new(descs: Vec<DescId<K>>) -> Self {
662        Self { inner: Mutex::new((descs, None)) }
663    }
664
665    /// Extends the pending descriptors buffer.
666    fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
667        let mut guard = self.inner.lock();
668        let (storage, waker) = &mut *guard;
669        storage.extend(descs);
670        if let Some(waker) = waker.take() {
671            waker.wake();
672        }
673    }
674
675    /// Submits the pending buffer to the driver through [`zx::Fifo`].
676    ///
677    /// It will return [`Poll::Pending`] if any of the following happens:
678    ///   - There are no descriptors pending.
679    ///   - The fifo is not ready for write.
680    fn poll_submit(
681        &self,
682        fifo: &fasync::Fifo<DescId<K>>,
683        cx: &mut Context<'_>,
684    ) -> Poll<Result<usize>> {
685        let mut guard = self.inner.lock();
686        let (storage, waker) = &mut *guard;
687        if storage.is_empty() {
688            *waker = Some(cx.waker().clone());
689            return Poll::Pending;
690        }
691
692        // TODO(https://fxbug.dev/42107145): We're assuming that writing to the
693        // FIFO here is a sufficient memory barrier for the other end to access
694        // the data. That is currently true but not really guaranteed by the
695        // API.
696        let submitted = ready!(fifo.try_write(cx, &storage[..]))
697            .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
698        let _drained = storage.drain(0..submitted);
699        Poll::Ready(Ok(submitted))
700    }
701}
702
703/// An intermediary buffer used to reduce syscall overhead by acting as a proxy
704/// to read entries from a FIFO.
705///
706/// `ReadyBuffer` caches read entries from a FIFO in pre-allocated memory,
707/// allowing different batch sizes between what is acquired from the FIFO and
708/// what's processed by the caller.
709struct ReadyBuffer<T> {
710    // NB: A vector of `MaybeUninit` here allows us to give a transparent memory
711    // layout to the FIFO object but still move objects out of our buffer
712    // without needing a `T: Default` implementation. There's a small added
713    // benefit of not paying for memory initialization on creation as well, but
714    // that's mostly negligible given all allocation is performed upfront.
715    data: Vec<MaybeUninit<T>>,
716    available: Range<usize>,
717}
718
719impl<T> Drop for ReadyBuffer<T> {
720    fn drop(&mut self) {
721        let Self { data, available } = self;
722        for initialized in &mut data[available.clone()] {
723            // SAFETY: the available range keeps track of initialized buffers,
724            // we must drop them on drop to uphold `MaybeUninit` expectations.
725            unsafe { initialized.assume_init_drop() }
726        }
727        *available = 0..0;
728    }
729}
730
731impl<T> ReadyBuffer<T> {
732    fn new(capacity: usize) -> Self {
733        let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
734        Self { data, available: 0..0 }
735    }
736
737    fn poll_with_fifo(
738        &mut self,
739        cx: &mut Context<'_>,
740        fifo: &fuchsia_async::Fifo<T>,
741    ) -> Poll<std::result::Result<T, zx::Status>>
742    where
743        T: fasync::FifoEntry,
744    {
745        let Self { data, available: Range { start, end } } = self;
746
747        loop {
748            // Always pop from available data first.
749            if *start != *end {
750                let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
751                *start += 1;
752                // SAFETY: Descriptor was in the initialized section, it was
753                // initialized.
754                let desc = unsafe { desc.assume_init() };
755                return Poll::Ready(Ok(desc));
756            }
757            // Fetch more from the FIFO.
758            let count = ready!(fifo.try_read(cx, &mut data[..]))?;
759            *start = 0;
760            *end = count;
761        }
762    }
763}
764
765struct TxIdleListeners {
766    event: event_listener::Event,
767    tx_in_flight: AtomicUsize,
768}
769
770impl TxIdleListeners {
771    fn new() -> Self {
772        Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
773    }
774
775    /// Decreases the number of outstanding tx buffers by 1.
776    ///
777    /// Notifies any tx idle listeners if the number reaches 0.
778    fn tx_complete(&self) {
779        let Self { event, tx_in_flight } = self;
780        let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
781        debug_assert_ne!(old_value, 0);
782        if old_value == 1 {
783            let _notified: usize = event.notify(usize::MAX);
784        }
785    }
786
787    /// Increases the number of outstanding tx buffers by 1.
788    fn tx_started(&self) {
789        let Self { event: _, tx_in_flight } = self;
790        let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
791    }
792
793    async fn wait(&self) {
794        let Self { event, tx_in_flight } = self;
795        // This is _the correct way_ of holding an `event_listener::Listener`.
796        // We check the condition before installing the listener in the fast
797        // case, then we must check the condition again after creating the
798        // listener in case we've raced with the condition updating. Finally we
799        // must loop and check the condition again because we're not fully
800        // guaranteed to not have spurious wakeups.
801        //
802        // See the event_listener crate documentation for more details.
803        loop {
804            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
805                return;
806            }
807
808            event_listener::listener!(event => listener);
809
810            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
811                return;
812            }
813
814            listener.await;
815        }
816    }
817}
818
819/// An RAII lease possibly keeping the system from suspension.
820///
821/// Yielded from [`Session::watch_rx_leases`].
822///
823/// Dropping an `RxLease` relinquishes it.
824#[derive(Debug)]
825pub struct RxLease {
826    handle: netdev::DelegatedRxLeaseHandle,
827}
828
829impl Drop for RxLease {
830    fn drop(&mut self) {
831        let Self { handle } = self;
832        // Change detector in case we need any evolution on how to relinquish
833        // leases.
834        match handle {
835            netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
836                // Dropping the channel is enough to relinquish the lease.
837            }
838            netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
839                // Dropping the eventpair is enough to relinquish the lease.
840            }
841            netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
842        }
843    }
844}
845
846impl RxLease {
847    /// Peeks the internal lease.
848    pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
849        &self.handle
850    }
851}
852
853#[cfg(test)]
854mod tests {
855    use std::num::NonZeroU32;
856    use std::ops::Deref;
857    use std::sync::Arc;
858    use std::task::Poll;
859
860    use assert_matches::assert_matches;
861    use fuchsia_async::Fifo;
862    use test_case::test_case;
863    use zerocopy::{FromBytes, Immutable, IntoBytes};
864
865    use crate::session::DerivableConfig;
866
867    use super::buffer::{
868        AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
869    };
870    use super::{
871        BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
872        ReadyBuffer, Task, TxIdleListeners,
873    };
874
875    const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
876        rx_depth: 1,
877        tx_depth: 1,
878        buffer_alignment: 1,
879        max_buffer_length: None,
880        min_rx_buffer_length: 0,
881        min_tx_buffer_head: 0,
882        min_tx_buffer_length: 0,
883        min_tx_buffer_tail: 0,
884        max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
885        min_rx_buffers: None,
886        rx_accel: Vec::new(),
887        tx_accel: Vec::new(),
888    };
889
890    const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
891        min_descriptor_length: 0,
892        descriptor_version: 1,
893        base_info: DEFAULT_DEVICE_BASE_INFO,
894    };
895
896    const DEFAULT_BUFFER_LENGTH: usize = 2048;
897
898    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
899        min_descriptor_length: u8::MAX,
900        ..DEFAULT_DEVICE_INFO
901    }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
902    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
903        descriptor_version: 42,
904        ..DEFAULT_DEVICE_INFO
905    }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
906    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
907        base_info: DeviceBaseInfo {
908            tx_depth: 0,
909            ..DEFAULT_DEVICE_BASE_INFO
910        },
911        ..DEFAULT_DEVICE_INFO
912    }, "no TX buffers")]
913    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
914        base_info: DeviceBaseInfo {
915            rx_depth: 0,
916            ..DEFAULT_DEVICE_BASE_INFO
917        },
918        ..DEFAULT_DEVICE_INFO
919    }, "no RX buffers")]
920    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
921        base_info: DeviceBaseInfo {
922            tx_depth: u16::MAX,
923            rx_depth: u16::MAX,
924            ..DEFAULT_DEVICE_BASE_INFO
925        },
926        ..DEFAULT_DEVICE_INFO
927    }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
928    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
929        base_info: DeviceBaseInfo {
930            min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
931            ..DEFAULT_DEVICE_BASE_INFO
932        },
933        ..DEFAULT_DEVICE_INFO
934    }, format!(
935        "buffer_length smaller than minimum TX requirement: {} < {}",
936        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
937    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
938        base_info: DeviceBaseInfo {
939            min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
940            ..DEFAULT_DEVICE_BASE_INFO
941        },
942        ..DEFAULT_DEVICE_INFO
943    }, format!(
944        "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
945        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
946    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
947        base_info: DeviceBaseInfo {
948            min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
949            ..DEFAULT_DEVICE_BASE_INFO
950        },
951        ..DEFAULT_DEVICE_INFO
952    }, format!(
953        "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
954        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
955    #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
956    #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
957    format!(
958        "too much memory required for the buffers: {} * {} > isize::MAX",
959        usize::MAX, 2))]
960    #[test_case(usize::MAX, DeviceInfo {
961        base_info: DeviceBaseInfo {
962            buffer_alignment: 2,
963            ..DEFAULT_DEVICE_BASE_INFO
964        },
965        ..DEFAULT_DEVICE_INFO
966    }, format!(
967        "not possible to align {} to {} under usize::MAX",
968        usize::MAX, 2))]
969    fn configs_from_device_info_err(
970        buffer_length: usize,
971        info: DeviceInfo,
972        expected: impl Deref<Target = str>,
973    ) {
974        let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
975        assert_matches!(
976            info.make_config(config),
977            Err(Error::Config(got)) if got.as_str() == expected.deref()
978        );
979    }
980
981    #[test_case(DeviceInfo {
982        base_info: DeviceBaseInfo {
983            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
984            ..DEFAULT_DEVICE_BASE_INFO
985        },
986        ..DEFAULT_DEVICE_INFO
987    }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
988    #[test_case(DeviceInfo {
989        base_info: DeviceBaseInfo {
990            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
991            ..DEFAULT_DEVICE_BASE_INFO
992        },
993        ..DEFAULT_DEVICE_INFO
994    }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
995    #[test_case(DeviceInfo {
996        base_info: DeviceBaseInfo {
997            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
998            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
999            ..DEFAULT_DEVICE_BASE_INFO
1000        },
1001        ..DEFAULT_DEVICE_INFO
1002    }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
1003    fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
1004        let config = info
1005            .make_config(DerivableConfig {
1006                default_buffer_length: DEFAULT_BUFFER_LENGTH,
1007                ..Default::default()
1008            })
1009            .expect("is valid");
1010        let Config {
1011            buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
1012            buffer_stride: _,
1013            num_rx_buffers: _,
1014            num_tx_buffers: _,
1015            options: _,
1016        } = config;
1017        assert_eq!(length, expected_length);
1018    }
1019
1020    fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
1021        let (handle, other_end) = zx::Fifo::create(1).unwrap();
1022        (Fifo::from_fifo(handle), other_end)
1023    }
1024
1025    fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
1026        fifo: Fifo<T>,
1027        rights_to_remove: zx::Rights,
1028    ) -> Fifo<T> {
1029        let fifo = zx::Fifo::from(fifo);
1030        let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
1031
1032        let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1033        Fifo::from_fifo(fifo)
1034    }
1035
1036    enum TxOrRx {
1037        Tx,
1038        Rx,
1039    }
1040    #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1041    #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1042    #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1043    #[fuchsia_async::run_singlethreaded(test)]
1044    async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1045        // This is a regression test for https://fxbug.dev/42072513. The flake
1046        // that caused that bug occurred because the Zircon channel was closed
1047        // but the error returned by a failed attempt to write to it wasn't
1048        // being propagated upwards. This test produces a similar situation by
1049        // altering the right on the FIFOs the task uses so as to cause either
1050        // an attempt to write or to read to fail. For completeness, it
1051        // exercises all the FIFO polls that comprise Task::poll.
1052        let (pool, _descriptors, _data) = Pool::new(
1053            DEFAULT_DEVICE_INFO
1054                .make_config(DerivableConfig {
1055                    default_buffer_length: DEFAULT_BUFFER_LENGTH,
1056                    ..Default::default()
1057                })
1058                .expect("is valid"),
1059        )
1060        .expect("is valid");
1061        let (session_proxy, _session_server) =
1062            fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1063
1064        let (rx, _rx_sender) = make_fifos();
1065        let (tx, _tx_receiver) = make_fifos();
1066
1067        // Attenuate rights on one of the FIFOs.
1068        let (tx, rx) = match which_fifo {
1069            TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1070            TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1071        };
1072
1073        let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1074        let inner = Arc::new(Inner {
1075            pool,
1076            proxy: session_proxy,
1077            name: "fake_task".to_string(),
1078            rx,
1079            tx,
1080            tx_pending: Pending::new(vec![]),
1081            rx_ready: Mutex::new(ReadyBuffer::new(10)),
1082            tx_ready: Mutex::new(ReadyBuffer::new(10)),
1083            tx_idle_listeners: TxIdleListeners::new(),
1084        });
1085
1086        inner.send(buf);
1087
1088        let mut task = Task { inner };
1089
1090        // The task should not be able to continue because it can't read from or
1091        // write to one of the FIFOs.
1092        assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1093    }
1094}