Skip to main content

netdevice_client/session/
mod.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice session.
6
7mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, StreamExt as _, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31    AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, SinglePartTxBuffer, Tx};
34
35/// A session between network device client and driver.
36#[derive(Clone)]
37pub struct Session {
38    inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        let Self { inner } = self;
44        let Inner {
45            name,
46            pool: _,
47            proxy: _,
48            rx: _,
49            tx: _,
50            tx_pending: _,
51            rx_ready: _,
52            tx_ready: _,
53            tx_idle_listeners: _,
54        } = &**inner;
55        f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56    }
57}
58
59impl Session {
60    /// Creates a new session with the given `name` and `config`.
61    pub async fn new(
62        device: &netdev::DeviceProxy,
63        name: &str,
64        config: Config,
65    ) -> Result<(Self, Task)> {
66        let inner = Inner::new(device, name, config).await?;
67        Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68    }
69
70    /// Sends a [`Buffer`] to the network device in this session.
71    pub fn send(&self, buffer: Buffer<Tx>) {
72        self.inner.send(buffer)
73    }
74
75    /// Receives a [`Buffer`] from the network device in this session.
76    pub async fn recv(&self) -> Result<Buffer<Rx>> {
77        self.inner.recv().await
78    }
79
80    /// Allocates a [`Buffer`] that may later be queued to the network device.
81    ///
82    /// The returned buffer will have at least `num_bytes` as size.
83    pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84        self.inner.pool.alloc_tx_buffer(num_bytes).await
85    }
86
87    /// Tries to allocate a [`SinglePartTxBuffer`].
88    ///
89    /// Returns `Ok(None)` if there is no available buffer, or `Err(Error::TxLength)`
90    /// if the requested size cannot meet the device requirement.
91    pub fn try_alloc_single_part_tx_buffer(
92        &self,
93        num_bytes: usize,
94    ) -> Result<Option<SinglePartTxBuffer>> {
95        self.inner.pool.try_alloc_single_part_tx_buffer(num_bytes)
96    }
97
98    /// Waits for at least one TX buffer to be available and returns an iterator
99    /// of buffers with `num_bytes` as capacity.
100    ///
101    /// The returned iterator is guaranteed to yield at least one item (though
102    /// it might be an error if the requested size cannot meet the device
103    /// requirement).
104    ///
105    /// # Note
106    ///
107    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
108    /// returned iterator will seemingly yield infinite items if the yielded
109    /// `Buffer`s are dropped while iterating.
110    pub async fn alloc_tx_buffers(
111        &self,
112        num_bytes: usize,
113    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
114        self.inner.pool.alloc_tx_buffers(num_bytes).await
115    }
116
117    /// Attaches [`Session`] to a port.
118    pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
119        // NB: Need to bind the future returned by `proxy.attach` to a variable
120        // otherwise this function's (`Session::attach`) returned future becomes
121        // not `Send` and we get unexpected compiler errors at a distance.
122        //
123        // The dyn borrow in the signature of `proxy.attach` seems to be the
124        // cause of the compiler's confusion.
125        let fut = self.inner.proxy.attach(&port.into(), rx_frames);
126        let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
127        Ok(())
128    }
129
130    /// Detaches a port from the [`Session`].
131    pub async fn detach(&self, port: Port) -> Result<()> {
132        let () = self
133            .inner
134            .proxy
135            .detach(&port.into())
136            .await?
137            .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
138        Ok(())
139    }
140
141    /// Blocks until there are no more tx buffers in flight to the backing
142    /// device.
143    ///
144    /// Note that this method does not prevent new buffers from being allocated
145    /// and sent, it is up to the caller to prevent any races. This future will
146    /// resolve as soon as it observes a tx idle event. That is, there are no
147    /// frames in flight to the backing device at all and the session currently
148    /// owns all allocated tx buffers.
149    ///
150    /// The synchronization guarantee provided by this method is that any
151    /// buffers previously given to [`Session::send`] will be accounted as
152    /// pending until the device has replied back.
153    pub async fn wait_tx_idle(&self) {
154        self.inner.tx_idle_listeners.wait().await;
155    }
156
157    /// Returns a stream of delegated rx leases from the device.
158    ///
159    /// Leases are yielded from the stream whenever the corresponding receive
160    /// buffer is dropped or reused for tx, which marks the end of processing
161    /// the marked buffer for the delegated lease.
162    ///
163    /// See [`fidl_fuchsia_hardware_network::DelegatedRxLease`] for more
164    /// details.
165    ///
166    /// # Panics
167    ///
168    /// Panics if the session was not created with
169    /// [`fidl_fuchsia_hardware_network::SessionFlags::RECEIVE_RX_POWER_LEASES`]
170    /// or if `watch_rx_leases` has already been called for this session.
171    pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
172        let inner = Arc::clone(&self.inner);
173        let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
174        futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
175            let DelegatedRxLease {
176                hold_until_frame,
177                handle,
178                __source_breaking: fidl::marker::SourceBreaking,
179            } = match inner.proxy.watch_delegated_rx_lease().await {
180                Ok(lease) => lease,
181                Err(e) => {
182                    if e.is_closed() {
183                        return Ok(None);
184                    } else {
185                        return Err(Error::Fidl(e));
186                    }
187                }
188            };
189            let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
190            let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
191
192            watcher.wait_until(hold_until_frame).await;
193            Ok(Some((handle, (inner, watcher))))
194        })
195    }
196
197    /// Closes the session.
198    pub async fn close(&self) -> Result<()> {
199        self.inner.proxy.close()?;
200        // Synchronize by waiting for the channel to be closed.
201        let mut event_stream = self.inner.proxy.take_event_stream();
202        while let Some(event) = event_stream.next().await {
203            match event {
204                Err(fidl::Error::ClientChannelClosed { .. }) => break,
205                Ok(_) => {} // Ignore other events.
206                Err(e) => return Err(Error::Fidl(e)),
207            }
208        }
209        Ok(())
210    }
211}
212
213struct Inner {
214    pool: Arc<Pool>,
215    proxy: netdev::SessionProxy,
216    name: String,
217    rx: fasync::Fifo<DescId<Rx>>,
218    tx: fasync::Fifo<DescId<Tx>>,
219    // Pending tx descriptors to be sent.
220    tx_pending: Pending<Tx>,
221    rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
222    tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
223    tx_idle_listeners: TxIdleListeners,
224}
225
226impl Inner {
227    /// Creates a new session.
228    async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
229        let (pool, descriptors, data) = Pool::new(config)?;
230
231        let session_info = {
232            // The following two constants are not provided by user, panic
233            // instead of returning an error.
234            let descriptor_length =
235                u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
236                    .expect("descriptor length in 64-bit words not representable by u8");
237            let data = vec![fidl_fuchsia_hardware_network::DataVmo {
238                id: Some(0),
239                vmo: Some(data),
240                num_rx_buffers: Some(config.num_rx_buffers.get()),
241                __source_breaking: fidl::marker::SourceBreaking,
242            }];
243            netdev::SessionInfo {
244                descriptors: Some(descriptors),
245                data: Some(data),
246                descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
247                descriptor_length: Some(descriptor_length),
248                descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
249                options: Some(config.options),
250                ..Default::default()
251            }
252        };
253
254        let (client, netdev::Fifos { rx, tx }) = device
255            .open_session(name, session_info)
256            .await?
257            .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
258        let proxy = client.into_proxy();
259        let rx = fasync::Fifo::from_fifo(rx);
260        let tx = fasync::Fifo::from_fifo(tx);
261
262        Ok(Arc::new(Self {
263            pool,
264            proxy,
265            name: name.to_owned(),
266            rx,
267            tx,
268            tx_pending: Pending::new(Vec::new()),
269            rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
270            tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
271            tx_idle_listeners: TxIdleListeners::new(),
272        }))
273    }
274
275    /// Polls to submit available rx descriptors from pool to driver.
276    ///
277    /// Returns the number of rx descriptors that are submitted.
278    fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
279        self.pool.rx_pending.poll_submit(&self.rx, cx)
280    }
281
282    /// Polls completed rx descriptors from the driver.
283    ///
284    /// Returns the the head of a completed rx descriptor chain.
285    fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
286        let mut rx_ready = self.rx_ready.lock();
287        rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
288    }
289
290    /// Polls to submit tx descriptors that are pending to the driver.
291    ///
292    /// Returns the number of tx descriptors that are successfully submitted.
293    fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
294        self.tx_pending.poll_submit(&self.tx, cx)
295    }
296
297    /// Polls completed tx descriptors from the driver then puts them in pool.
298    fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
299        let result = {
300            let mut tx_ready = self.tx_ready.lock();
301            // TODO(https://github.com/rust-lang/rust/issues/63569): Provide entire
302            // chain of completed descriptors to the pool at once when slice of
303            // MaybeUninit is stabilized.
304            tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
305                Ok(desc) => self.pool.tx_completed(desc),
306                Err(status) => Err(Error::Fifo("read", "tx", status)),
307            })
308        };
309
310        match &result {
311            Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
312            Poll::Pending | Poll::Ready(Err(_)) => {}
313        }
314        result
315    }
316
317    /// Sends the [`Buffer`] to the driver.
318    ///
319    /// Note: Transmit is completely infallible because the buffer layout and
320    /// zero-padding are already fully resolved and verified upfront during
321    /// buffer allocation (see `AllocGuard::init` in `pool.rs` for details
322    /// and design tradeoffs).
323    fn send(&self, buffer: Buffer<Tx>) {
324        self.tx_idle_listeners.tx_started();
325        self.tx_pending.extend(std::iter::once(buffer.leak()));
326    }
327
328    /// Receives a [`Buffer`] from the driver.
329    ///
330    /// Waits until there is completed rx buffers from the driver.
331    async fn recv(&self) -> Result<Buffer<Rx>> {
332        poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
333            let head = ready!(self.poll_complete_rx(cx))?;
334            Poll::Ready(self.pool.rx_completed(head))
335        })
336        .await
337    }
338}
339
340/// The backing task that drives the session.
341///
342/// A session will stop making progress if this task is not polled continuously.
343#[must_use = "futures do nothing unless you `.await` or poll them"]
344pub struct Task {
345    inner: Arc<Inner>,
346}
347
348impl Future for Task {
349    type Output = Result<()>;
350    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351        let inner = &Pin::into_inner(self).inner;
352        loop {
353            let mut all_pending = true;
354            // TODO(https://fxbug.dev/42158458): poll once for all completed
355            // descriptors if this becomes a performance bottleneck.
356            while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
357                all_pending = false;
358            }
359            if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
360                all_pending = false;
361            }
362            if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
363                all_pending = false;
364            }
365            if all_pending {
366                return Poll::Pending;
367            }
368        }
369    }
370}
371
372/// Session configuration.
373#[derive(Debug, Clone, Copy)]
374pub struct Config {
375    /// Buffer stride on VMO, in bytes.
376    buffer_stride: NonZeroU64,
377    /// Number of rx descriptors to allocate.
378    num_rx_buffers: NonZeroU16,
379    /// Number of tx descriptors to allocate.
380    num_tx_buffers: NonZeroU16,
381    /// Session flags.
382    options: netdev::SessionFlags,
383    /// Buffer layout.
384    buffer_layout: BufferLayout,
385}
386
387/// Describes the buffer layout that [`Pool`] needs to know.
388#[derive(Debug, Clone, Copy)]
389struct BufferLayout {
390    /// Minimum tx buffer data length.
391    min_tx_data: usize,
392    /// Minimum tx buffer head length.
393    min_tx_head: u16,
394    /// Minimum tx buffer tail length.
395    min_tx_tail: u16,
396    /// The length of a buffer.
397    length: usize,
398}
399
400/// Network device base info with all required fields.
401#[derive(Debug, Clone, ValidFidlTable)]
402#[fidl_table_src(netdev::DeviceBaseInfo)]
403#[fidl_table_strict]
404pub struct DeviceBaseInfo {
405    /// Maximum number of items in rx FIFO (per session).
406    pub rx_depth: u16,
407    /// Maximum number of items in tx FIFO (per session).
408    pub tx_depth: u16,
409    /// Alignment requirement for buffers in the data VMO.
410    pub buffer_alignment: u32,
411    /// Maximum supported length of buffers in the data VMO, in bytes.
412    #[fidl_field_type(optional)]
413    pub max_buffer_length: Option<NonZeroU32>,
414    /// The minimum rx buffer length required for device.
415    pub min_rx_buffer_length: u32,
416    /// The minimum tx buffer length required for the device.
417    pub min_tx_buffer_length: u32,
418    /// The number of bytes the device requests be free as `head` space in a tx buffer.
419    pub min_tx_buffer_head: u16,
420    /// The amount of bytes the device requests be free as `tail` space in a tx buffer.
421    pub min_tx_buffer_tail: u16,
422    /// Maximum descriptor chain length accepted by the device.
423    pub max_buffer_parts: u8,
424    /// Minimum amount of Rx buffers the client needs to prepare for the
425    /// network device.
426    #[fidl_field_type(optional)]
427    pub min_rx_buffers: Option<NonZeroU16>,
428}
429
430/// Network device information with all required fields.
431#[derive(Debug, Clone, ValidFidlTable)]
432#[fidl_table_src(netdev::DeviceInfo)]
433#[fidl_table_strict]
434pub struct DeviceInfo {
435    /// Minimum descriptor length, in 64-bit words.
436    pub min_descriptor_length: u8,
437    /// Accepted descriptor version.
438    pub descriptor_version: u8,
439    /// Device base info.
440    pub base_info: DeviceBaseInfo,
441}
442
443/// Basic session configuration that can be given to [`DeviceInfo`] to generate
444/// [`Config`]s.
445#[derive(Debug, Copy, Clone)]
446pub struct DerivableConfig {
447    /// The desired default buffer length for the session.
448    pub default_buffer_length: usize,
449    /// Enable rx lease watching.
450    pub watch_rx_leases: bool,
451}
452
453impl DerivableConfig {
454    /// A sensibly common default buffer length to be used in
455    /// [`DerivableConfig`]. Provided to ease test writing.
456    ///
457    /// Chosen to be the next power of two after the default Ethernet MTU.
458    ///
459    /// This is the value of the buffer length in the `Default` impl.
460    pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
461    /// The value returned by the `Default` impl.
462    pub const DEFAULT: Self =
463        Self { default_buffer_length: Self::DEFAULT_BUFFER_LENGTH, watch_rx_leases: false };
464}
465
466impl Default for DerivableConfig {
467    fn default() -> Self {
468        Self::DEFAULT
469    }
470}
471
472impl DeviceInfo {
473    /// Create a new session config from the device information.
474    ///
475    /// This method also does the boundary checks so that data_length/offset fields read
476    /// from descriptors are safe to convert to [`usize`].
477    pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
478        let DeviceInfo {
479            min_descriptor_length,
480            descriptor_version,
481            base_info:
482                DeviceBaseInfo {
483                    rx_depth,
484                    tx_depth,
485                    buffer_alignment,
486                    max_buffer_length,
487                    min_rx_buffer_length,
488                    min_tx_buffer_length,
489                    min_tx_buffer_head,
490                    min_tx_buffer_tail,
491                    max_buffer_parts: _,
492                    min_rx_buffers: _,
493                },
494        } = self;
495        if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
496            return Err(Error::Config(format!(
497                "descriptor version mismatch: {} != {}",
498                NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
499            )));
500        }
501        if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
502            return Err(Error::Config(format!(
503                "descriptor length too small: {} < {}",
504                NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
505            )));
506        }
507
508        let DerivableConfig { default_buffer_length, watch_rx_leases } = config;
509
510        let num_rx_buffers =
511            NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
512        let num_tx_buffers =
513            NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
514
515        let max_buffer_length = max_buffer_length
516            .and_then(|max| {
517                // The error case is the case where max_buffer_length can't fix in a
518                // usize, but we use it to compare it to usizes, so that's
519                // equivalent to no limit.
520                usize::try_from(max.get()).ok_checked::<TryFromIntError>()
521            })
522            .unwrap_or(usize::MAX);
523        let min_buffer_length = usize::try_from(*min_rx_buffer_length)
524            .ok_checked::<TryFromIntError>()
525            .unwrap_or(usize::MAX);
526
527        let buffer_length =
528            usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
529
530        let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
531            |std::num::TryFromIntError { .. }| {
532                Error::Config(format!(
533                    "buffer_alignment not representable within usize: {}",
534                    buffer_alignment,
535                ))
536            },
537        )?;
538
539        let buffer_stride = buffer_length
540            .checked_add(buffer_alignment - 1)
541            .map(|x| x / buffer_alignment * buffer_alignment)
542            .ok_or_else(|| {
543                Error::Config(format!(
544                    "not possible to align {} to {} under usize::MAX",
545                    buffer_length, buffer_alignment,
546                ))
547            })?;
548
549        if buffer_stride < buffer_length {
550            return Err(Error::Config(format!(
551                "buffer stride too small {} < {}",
552                buffer_stride, buffer_length
553            )));
554        }
555
556        if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
557            return Err(Error::Config(format!(
558                "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
559                buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
560            )));
561        }
562
563        let num_buffers =
564            rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
565                Error::Config(format!(
566                    "too many buffers requested: {} + {} > u16::MAX",
567                    rx_depth, tx_depth
568                ))
569            })?;
570
571        let buffer_stride =
572            u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
573                Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
574            })?;
575
576        // This is following the practice of rust stdlib to ensure allocation
577        // size never reaches isize::MAX.
578        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
579        match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
580            None | Some(Err(std::num::TryFromIntError { .. })) => {
581                return Err(Error::Config(format!(
582                    "too much memory required for the buffers: {} * {} > isize::MAX",
583                    buffer_stride, num_buffers
584                )));
585            }
586            Some(Ok(_total)) => (),
587        };
588
589        let buffer_stride = NonZeroU64::new(buffer_stride)
590            .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
591
592        let min_tx_data = match usize::try_from(*min_tx_buffer_length)
593            .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
594        {
595            Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
596            // Either the conversion or the comparison failed.
597            Ok(None) | Err(std::num::TryFromIntError { .. }) => {
598                return Err(Error::Config(format!(
599                    "buffer_length smaller than minimum TX requirement: {} < {}",
600                    buffer_length, *min_tx_buffer_length
601                )));
602            }
603        };
604
605        let mut options = netdev::SessionFlags::empty();
606        options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
607
608        Ok(Config {
609            buffer_stride,
610            num_rx_buffers,
611            num_tx_buffers,
612            options,
613            buffer_layout: BufferLayout {
614                length: buffer_length,
615                min_tx_head: *min_tx_buffer_head,
616                min_tx_tail: *min_tx_buffer_tail,
617                min_tx_data,
618            },
619        })
620    }
621}
622
623/// A port of the device.
624#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
625pub struct Port {
626    pub(crate) base: u8,
627    pub(crate) salt: u8,
628}
629
630impl TryFrom<netdev::PortId> for Port {
631    type Error = Error;
632    fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
633        if base <= netdev::MAX_PORTS {
634            Ok(Self { base, salt })
635        } else {
636            Err(Error::InvalidPortId(base))
637        }
638    }
639}
640
641impl From<Port> for netdev::PortId {
642    fn from(Port { base, salt }: Port) -> Self {
643        Self { base, salt }
644    }
645}
646
647/// Pending descriptors to be sent to driver.
648struct Pending<K: AllocKind> {
649    inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
650}
651
652impl<K: AllocKind> Pending<K> {
653    fn new(descs: Vec<DescId<K>>) -> Self {
654        Self { inner: Mutex::new((descs, None)) }
655    }
656
657    /// Extends the pending descriptors buffer.
658    fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
659        let mut guard = self.inner.lock();
660        let (storage, waker) = &mut *guard;
661        storage.extend(descs);
662        if let Some(waker) = waker.take() {
663            waker.wake();
664        }
665    }
666
667    /// Submits the pending buffer to the driver through [`zx::Fifo`].
668    ///
669    /// It will return [`Poll::Pending`] if any of the following happens:
670    ///   - There are no descriptors pending.
671    ///   - The fifo is not ready for write.
672    fn poll_submit(
673        &self,
674        fifo: &fasync::Fifo<DescId<K>>,
675        cx: &mut Context<'_>,
676    ) -> Poll<Result<usize>> {
677        let mut guard = self.inner.lock();
678        let (storage, waker) = &mut *guard;
679        if storage.is_empty() {
680            *waker = Some(cx.waker().clone());
681            return Poll::Pending;
682        }
683
684        // TODO(https://fxbug.dev/42107145): We're assuming that writing to the
685        // FIFO here is a sufficient memory barrier for the other end to access
686        // the data. That is currently true but not really guaranteed by the
687        // API.
688        let submitted = ready!(fifo.try_write(cx, &storage[..]))
689            .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
690        let _drained = storage.drain(0..submitted);
691        Poll::Ready(Ok(submitted))
692    }
693}
694
695/// An intermediary buffer used to reduce syscall overhead by acting as a proxy
696/// to read entries from a FIFO.
697///
698/// `ReadyBuffer` caches read entries from a FIFO in pre-allocated memory,
699/// allowing different batch sizes between what is acquired from the FIFO and
700/// what's processed by the caller.
701struct ReadyBuffer<T> {
702    // NB: A vector of `MaybeUninit` here allows us to give a transparent memory
703    // layout to the FIFO object but still move objects out of our buffer
704    // without needing a `T: Default` implementation. There's a small added
705    // benefit of not paying for memory initialization on creation as well, but
706    // that's mostly negligible given all allocation is performed upfront.
707    data: Vec<MaybeUninit<T>>,
708    available: Range<usize>,
709}
710
711impl<T> Drop for ReadyBuffer<T> {
712    fn drop(&mut self) {
713        let Self { data, available } = self;
714        for initialized in &mut data[available.clone()] {
715            // SAFETY: the available range keeps track of initialized buffers,
716            // we must drop them on drop to uphold `MaybeUninit` expectations.
717            unsafe { initialized.assume_init_drop() }
718        }
719        *available = 0..0;
720    }
721}
722
723impl<T> ReadyBuffer<T> {
724    fn new(capacity: usize) -> Self {
725        let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
726        Self { data, available: 0..0 }
727    }
728
729    fn poll_with_fifo(
730        &mut self,
731        cx: &mut Context<'_>,
732        fifo: &fuchsia_async::Fifo<T>,
733    ) -> Poll<std::result::Result<T, zx::Status>>
734    where
735        T: fasync::FifoEntry,
736    {
737        let Self { data, available: Range { start, end } } = self;
738
739        loop {
740            // Always pop from available data first.
741            if *start != *end {
742                let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
743                *start += 1;
744                // SAFETY: Descriptor was in the initialized section, it was
745                // initialized.
746                let desc = unsafe { desc.assume_init() };
747                return Poll::Ready(Ok(desc));
748            }
749            // Fetch more from the FIFO.
750            let count = ready!(fifo.try_read(cx, &mut data[..]))?;
751            *start = 0;
752            *end = count;
753        }
754    }
755}
756
757struct TxIdleListeners {
758    event: event_listener::Event,
759    tx_in_flight: AtomicUsize,
760}
761
762impl TxIdleListeners {
763    fn new() -> Self {
764        Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
765    }
766
767    /// Decreases the number of outstanding tx buffers by 1.
768    ///
769    /// Notifies any tx idle listeners if the number reaches 0.
770    fn tx_complete(&self) {
771        let Self { event, tx_in_flight } = self;
772        let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
773        debug_assert_ne!(old_value, 0);
774        if old_value == 1 {
775            let _notified: usize = event.notify(usize::MAX);
776        }
777    }
778
779    /// Increases the number of outstanding tx buffers by 1.
780    fn tx_started(&self) {
781        let Self { event: _, tx_in_flight } = self;
782        let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
783    }
784
785    async fn wait(&self) {
786        let Self { event, tx_in_flight } = self;
787        // This is _the correct way_ of holding an `event_listener::Listener`.
788        // We check the condition before installing the listener in the fast
789        // case, then we must check the condition again after creating the
790        // listener in case we've raced with the condition updating. Finally we
791        // must loop and check the condition again because we're not fully
792        // guaranteed to not have spurious wakeups.
793        //
794        // See the event_listener crate documentation for more details.
795        loop {
796            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
797                return;
798            }
799
800            event_listener::listener!(event => listener);
801
802            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
803                return;
804            }
805
806            listener.await;
807        }
808    }
809}
810
811/// An RAII lease possibly keeping the system from suspension.
812///
813/// Yielded from [`Session::watch_rx_leases`].
814///
815/// Dropping an `RxLease` relinquishes it.
816#[derive(Debug)]
817pub struct RxLease {
818    handle: netdev::DelegatedRxLeaseHandle,
819}
820
821impl Drop for RxLease {
822    fn drop(&mut self) {
823        let Self { handle } = self;
824        // Change detector in case we need any evolution on how to relinquish
825        // leases.
826        match handle {
827            netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
828                // Dropping the channel is enough to relinquish the lease.
829            }
830            netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
831                // Dropping the eventpair is enough to relinquish the lease.
832            }
833            netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
834        }
835    }
836}
837
838impl RxLease {
839    /// Peeks the internal lease.
840    pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
841        &self.handle
842    }
843}
844
845#[cfg(test)]
846mod tests {
847    use std::num::NonZeroU32;
848    use std::ops::Deref;
849    use std::sync::Arc;
850    use std::task::Poll;
851
852    use assert_matches::assert_matches;
853    use fuchsia_async::Fifo;
854    use test_case::test_case;
855    use zerocopy::{FromBytes, Immutable, IntoBytes};
856
857    use crate::session::DerivableConfig;
858
859    use super::buffer::{
860        AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
861    };
862    use super::{
863        BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
864        ReadyBuffer, Task, TxIdleListeners,
865    };
866
867    const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
868        rx_depth: 1,
869        tx_depth: 1,
870        buffer_alignment: 1,
871        max_buffer_length: None,
872        min_rx_buffer_length: 0,
873        min_tx_buffer_head: 0,
874        min_tx_buffer_length: 0,
875        min_tx_buffer_tail: 0,
876        max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
877        min_rx_buffers: None,
878    };
879
880    const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
881        min_descriptor_length: 0,
882        descriptor_version: 1,
883        base_info: DEFAULT_DEVICE_BASE_INFO,
884    };
885
886    const DEFAULT_BUFFER_LENGTH: usize = 2048;
887
888    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
889        min_descriptor_length: u8::MAX,
890        ..DEFAULT_DEVICE_INFO
891    }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
892    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
893        descriptor_version: 42,
894        ..DEFAULT_DEVICE_INFO
895    }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
896    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
897        base_info: DeviceBaseInfo {
898            tx_depth: 0,
899            ..DEFAULT_DEVICE_BASE_INFO
900        },
901        ..DEFAULT_DEVICE_INFO
902    }, "no TX buffers")]
903    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
904        base_info: DeviceBaseInfo {
905            rx_depth: 0,
906            ..DEFAULT_DEVICE_BASE_INFO
907        },
908        ..DEFAULT_DEVICE_INFO
909    }, "no RX buffers")]
910    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
911        base_info: DeviceBaseInfo {
912            tx_depth: u16::MAX,
913            rx_depth: u16::MAX,
914            ..DEFAULT_DEVICE_BASE_INFO
915        },
916        ..DEFAULT_DEVICE_INFO
917    }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
918    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
919        base_info: DeviceBaseInfo {
920            min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
921            ..DEFAULT_DEVICE_BASE_INFO
922        },
923        ..DEFAULT_DEVICE_INFO
924    }, format!(
925        "buffer_length smaller than minimum TX requirement: {} < {}",
926        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
927    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
928        base_info: DeviceBaseInfo {
929            min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
930            ..DEFAULT_DEVICE_BASE_INFO
931        },
932        ..DEFAULT_DEVICE_INFO
933    }, format!(
934        "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
935        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
936    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
937        base_info: DeviceBaseInfo {
938            min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
939            ..DEFAULT_DEVICE_BASE_INFO
940        },
941        ..DEFAULT_DEVICE_INFO
942    }, format!(
943        "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
944        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
945    #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
946    #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
947    format!(
948        "too much memory required for the buffers: {} * {} > isize::MAX",
949        usize::MAX, 2))]
950    #[test_case(usize::MAX, DeviceInfo {
951        base_info: DeviceBaseInfo {
952            buffer_alignment: 2,
953            ..DEFAULT_DEVICE_BASE_INFO
954        },
955        ..DEFAULT_DEVICE_INFO
956    }, format!(
957        "not possible to align {} to {} under usize::MAX",
958        usize::MAX, 2))]
959    fn configs_from_device_info_err(
960        buffer_length: usize,
961        info: DeviceInfo,
962        expected: impl Deref<Target = str>,
963    ) {
964        let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
965        assert_matches!(
966            info.make_config(config),
967            Err(Error::Config(got)) if got.as_str() == expected.deref()
968        );
969    }
970
971    #[test_case(DeviceInfo {
972        base_info: DeviceBaseInfo {
973            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
974            ..DEFAULT_DEVICE_BASE_INFO
975        },
976        ..DEFAULT_DEVICE_INFO
977    }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
978    #[test_case(DeviceInfo {
979        base_info: DeviceBaseInfo {
980            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
981            ..DEFAULT_DEVICE_BASE_INFO
982        },
983        ..DEFAULT_DEVICE_INFO
984    }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
985    #[test_case(DeviceInfo {
986        base_info: DeviceBaseInfo {
987            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
988            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
989            ..DEFAULT_DEVICE_BASE_INFO
990        },
991        ..DEFAULT_DEVICE_INFO
992    }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
993    fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
994        let config = info
995            .make_config(DerivableConfig {
996                default_buffer_length: DEFAULT_BUFFER_LENGTH,
997                ..Default::default()
998            })
999            .expect("is valid");
1000        let Config {
1001            buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
1002            buffer_stride: _,
1003            num_rx_buffers: _,
1004            num_tx_buffers: _,
1005            options: _,
1006        } = config;
1007        assert_eq!(length, expected_length);
1008    }
1009
1010    fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
1011        let (handle, other_end) = zx::Fifo::create(1).unwrap();
1012        (Fifo::from_fifo(handle), other_end)
1013    }
1014
1015    fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
1016        fifo: Fifo<T>,
1017        rights_to_remove: zx::Rights,
1018    ) -> Fifo<T> {
1019        let fifo = zx::Fifo::from(fifo);
1020        let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
1021
1022        let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1023        Fifo::from_fifo(fifo)
1024    }
1025
1026    enum TxOrRx {
1027        Tx,
1028        Rx,
1029    }
1030    #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1031    #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1032    #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1033    #[fuchsia_async::run_singlethreaded(test)]
1034    async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1035        // This is a regression test for https://fxbug.dev/42072513. The flake
1036        // that caused that bug occurred because the Zircon channel was closed
1037        // but the error returned by a failed attempt to write to it wasn't
1038        // being propagated upwards. This test produces a similar situation by
1039        // altering the right on the FIFOs the task uses so as to cause either
1040        // an attempt to write or to read to fail. For completeness, it
1041        // exercises all the FIFO polls that comprise Task::poll.
1042        let (pool, _descriptors, _data) = Pool::new(
1043            DEFAULT_DEVICE_INFO
1044                .make_config(DerivableConfig {
1045                    default_buffer_length: DEFAULT_BUFFER_LENGTH,
1046                    ..Default::default()
1047                })
1048                .expect("is valid"),
1049        )
1050        .expect("is valid");
1051        let (session_proxy, _session_server) =
1052            fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1053
1054        let (rx, _rx_sender) = make_fifos();
1055        let (tx, _tx_receiver) = make_fifos();
1056
1057        // Attenuate rights on one of the FIFOs.
1058        let (tx, rx) = match which_fifo {
1059            TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1060            TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1061        };
1062
1063        let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1064        let inner = Arc::new(Inner {
1065            pool,
1066            proxy: session_proxy,
1067            name: "fake_task".to_string(),
1068            rx,
1069            tx,
1070            tx_pending: Pending::new(vec![]),
1071            rx_ready: Mutex::new(ReadyBuffer::new(10)),
1072            tx_ready: Mutex::new(ReadyBuffer::new(10)),
1073            tx_idle_listeners: TxIdleListeners::new(),
1074        });
1075
1076        inner.send(buf);
1077
1078        let mut task = Task { inner };
1079
1080        // The task should not be able to continue because it can't read from or
1081        // write to one of the FIFOs.
1082        assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1083    }
1084}