Skip to main content

netdevice_client/session/
mod.rs

1// Copyright 2021 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Fuchsia netdevice session.
6
7mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, StreamExt as _, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31    AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, Tx};
34
35/// A session between network device client and driver.
36#[derive(Clone)]
37pub struct Session {
38    inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43        let Self { inner } = self;
44        let Inner {
45            name,
46            pool: _,
47            proxy: _,
48            rx: _,
49            tx: _,
50            tx_pending: _,
51            rx_ready: _,
52            tx_ready: _,
53            tx_idle_listeners: _,
54        } = &**inner;
55        f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56    }
57}
58
59impl Session {
60    /// Creates a new session with the given `name` and `config`.
61    pub async fn new(
62        device: &netdev::DeviceProxy,
63        name: &str,
64        config: Config,
65    ) -> Result<(Self, Task)> {
66        let inner = Inner::new(device, name, config).await?;
67        Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68    }
69
70    /// Sends a [`Buffer`] to the network device in this session.
71    pub fn send(&self, buffer: Buffer<Tx>) -> Result<()> {
72        self.inner.send(buffer)
73    }
74
75    /// Receives a [`Buffer`] from the network device in this session.
76    pub async fn recv(&self) -> Result<Buffer<Rx>> {
77        self.inner.recv().await
78    }
79
80    /// Allocates a [`Buffer`] that may later be queued to the network device.
81    ///
82    /// The returned buffer will have at least `num_bytes` as size.
83    pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84        self.inner.pool.alloc_tx_buffer(num_bytes).await
85    }
86
87    /// Waits for at least one TX buffer to be available and returns an iterator
88    /// of buffers with `num_bytes` as capacity.
89    ///
90    /// The returned iterator is guaranteed to yield at least one item (though
91    /// it might be an error if the requested size cannot meet the device
92    /// requirement).
93    ///
94    /// # Note
95    ///
96    /// Given a `Buffer<Tx>` is returned to the pool when it's dropped, the
97    /// returned iterator will seemingly yield infinite items if the yielded
98    /// `Buffer`s are dropped while iterating.
99    pub async fn alloc_tx_buffers(
100        &self,
101        num_bytes: usize,
102    ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
103        self.inner.pool.alloc_tx_buffers(num_bytes).await
104    }
105
106    /// Attaches [`Session`] to a port.
107    pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
108        // NB: Need to bind the future returned by `proxy.attach` to a variable
109        // otherwise this function's (`Session::attach`) returned future becomes
110        // not `Send` and we get unexpected compiler errors at a distance.
111        //
112        // The dyn borrow in the signature of `proxy.attach` seems to be the
113        // cause of the compiler's confusion.
114        let fut = self.inner.proxy.attach(&port.into(), rx_frames);
115        let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
116        Ok(())
117    }
118
119    /// Detaches a port from the [`Session`].
120    pub async fn detach(&self, port: Port) -> Result<()> {
121        let () = self
122            .inner
123            .proxy
124            .detach(&port.into())
125            .await?
126            .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
127        Ok(())
128    }
129
130    /// Blocks until there are no more tx buffers in flight to the backing
131    /// device.
132    ///
133    /// Note that this method does not prevent new buffers from being allocated
134    /// and sent, it is up to the caller to prevent any races. This future will
135    /// resolve as soon as it observes a tx idle event. That is, there are no
136    /// frames in flight to the backing device at all and the session currently
137    /// owns all allocated tx buffers.
138    ///
139    /// The synchronization guarantee provided by this method is that any
140    /// buffers previously given to [`Session::send`] will be accounted as
141    /// pending until the device has replied back.
142    pub async fn wait_tx_idle(&self) {
143        self.inner.tx_idle_listeners.wait().await;
144    }
145
146    /// Returns a stream of delegated rx leases from the device.
147    ///
148    /// Leases are yielded from the stream whenever the corresponding receive
149    /// buffer is dropped or reused for tx, which marks the end of processing
150    /// the marked buffer for the delegated lease.
151    ///
152    /// See [`fidl_fuchsia_hardware_network::DelegatedRxLease`] for more
153    /// details.
154    ///
155    /// # Panics
156    ///
157    /// Panics if the session was not created with
158    /// [`fidl_fuchsia_hardware_network::SessionFlags::RECEIVE_RX_POWER_LEASES`]
159    /// or if `watch_rx_leases` has already been called for this session.
160    pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
161        let inner = Arc::clone(&self.inner);
162        let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
163        futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
164            let DelegatedRxLease {
165                hold_until_frame,
166                handle,
167                __source_breaking: fidl::marker::SourceBreaking,
168            } = match inner.proxy.watch_delegated_rx_lease().await {
169                Ok(lease) => lease,
170                Err(e) => {
171                    if e.is_closed() {
172                        return Ok(None);
173                    } else {
174                        return Err(Error::Fidl(e));
175                    }
176                }
177            };
178            let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
179            let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
180
181            watcher.wait_until(hold_until_frame).await;
182            Ok(Some((handle, (inner, watcher))))
183        })
184    }
185
186    /// Closes the session.
187    pub async fn close(&self) -> Result<()> {
188        self.inner.proxy.close()?;
189        // Synchronize by waiting for the channel to be closed.
190        let mut event_stream = self.inner.proxy.take_event_stream();
191        while let Some(event) = event_stream.next().await {
192            match event {
193                Err(fidl::Error::ClientChannelClosed { .. }) => break,
194                Ok(_) => {} // Ignore other events.
195                Err(e) => return Err(Error::Fidl(e)),
196            }
197        }
198        Ok(())
199    }
200}
201
202struct Inner {
203    pool: Arc<Pool>,
204    proxy: netdev::SessionProxy,
205    name: String,
206    rx: fasync::Fifo<DescId<Rx>>,
207    tx: fasync::Fifo<DescId<Tx>>,
208    // Pending tx descriptors to be sent.
209    tx_pending: Pending<Tx>,
210    rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
211    tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
212    tx_idle_listeners: TxIdleListeners,
213}
214
215impl Inner {
216    /// Creates a new session.
217    async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
218        let (pool, descriptors, data) = Pool::new(config)?;
219
220        let session_info = {
221            // The following two constants are not provided by user, panic
222            // instead of returning an error.
223            let descriptor_length =
224                u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
225                    .expect("descriptor length in 64-bit words not representable by u8");
226            let data = vec![fidl_fuchsia_hardware_network::DataVmo {
227                id: Some(0),
228                vmo: Some(data),
229                num_rx_buffers: Some(config.num_rx_buffers.get()),
230                __source_breaking: fidl::marker::SourceBreaking,
231            }];
232            netdev::SessionInfo {
233                descriptors: Some(descriptors),
234                data: Some(data),
235                descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
236                descriptor_length: Some(descriptor_length),
237                descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
238                options: Some(config.options),
239                ..Default::default()
240            }
241        };
242
243        let (client, netdev::Fifos { rx, tx }) = device
244            .open_session(name, session_info)
245            .await?
246            .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
247        let proxy = client.into_proxy();
248        let rx = fasync::Fifo::from_fifo(rx);
249        let tx = fasync::Fifo::from_fifo(tx);
250
251        Ok(Arc::new(Self {
252            pool,
253            proxy,
254            name: name.to_owned(),
255            rx,
256            tx,
257            tx_pending: Pending::new(Vec::new()),
258            rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
259            tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
260            tx_idle_listeners: TxIdleListeners::new(),
261        }))
262    }
263
264    /// Polls to submit available rx descriptors from pool to driver.
265    ///
266    /// Returns the number of rx descriptors that are submitted.
267    fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
268        self.pool.rx_pending.poll_submit(&self.rx, cx)
269    }
270
271    /// Polls completed rx descriptors from the driver.
272    ///
273    /// Returns the the head of a completed rx descriptor chain.
274    fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
275        let mut rx_ready = self.rx_ready.lock();
276        rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
277    }
278
279    /// Polls to submit tx descriptors that are pending to the driver.
280    ///
281    /// Returns the number of tx descriptors that are successfully submitted.
282    fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
283        self.tx_pending.poll_submit(&self.tx, cx)
284    }
285
286    /// Polls completed tx descriptors from the driver then puts them in pool.
287    fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
288        let result = {
289            let mut tx_ready = self.tx_ready.lock();
290            // TODO(https://github.com/rust-lang/rust/issues/63569): Provide entire
291            // chain of completed descriptors to the pool at once when slice of
292            // MaybeUninit is stabilized.
293            tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
294                Ok(desc) => self.pool.tx_completed(desc),
295                Err(status) => Err(Error::Fifo("read", "tx", status)),
296            })
297        };
298
299        match &result {
300            Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
301            Poll::Pending | Poll::Ready(Err(_)) => {}
302        }
303        result
304    }
305
306    /// Sends the [`Buffer`] to the driver.
307    fn send(&self, mut buffer: Buffer<Tx>) -> Result<()> {
308        buffer.pad()?;
309        buffer.commit();
310        self.tx_idle_listeners.tx_started();
311        self.tx_pending.extend(std::iter::once(buffer.leak()));
312        Ok(())
313    }
314
315    /// Receives a [`Buffer`] from the driver.
316    ///
317    /// Waits until there is completed rx buffers from the driver.
318    async fn recv(&self) -> Result<Buffer<Rx>> {
319        poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
320            let head = ready!(self.poll_complete_rx(cx))?;
321            Poll::Ready(self.pool.rx_completed(head))
322        })
323        .await
324    }
325}
326
327/// The backing task that drives the session.
328///
329/// A session will stop making progress if this task is not polled continuously.
330#[must_use = "futures do nothing unless you `.await` or poll them"]
331pub struct Task {
332    inner: Arc<Inner>,
333}
334
335impl Future for Task {
336    type Output = Result<()>;
337    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338        let inner = &Pin::into_inner(self).inner;
339        loop {
340            let mut all_pending = true;
341            // TODO(https://fxbug.dev/42158458): poll once for all completed
342            // descriptors if this becomes a performance bottleneck.
343            while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
344                all_pending = false;
345            }
346            if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
347                all_pending = false;
348            }
349            if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
350                all_pending = false;
351            }
352            if all_pending {
353                return Poll::Pending;
354            }
355        }
356    }
357}
358
359/// Session configuration.
360#[derive(Debug, Clone, Copy)]
361pub struct Config {
362    /// Buffer stride on VMO, in bytes.
363    buffer_stride: NonZeroU64,
364    /// Number of rx descriptors to allocate.
365    num_rx_buffers: NonZeroU16,
366    /// Number of tx descriptors to allocate.
367    num_tx_buffers: NonZeroU16,
368    /// Session flags.
369    options: netdev::SessionFlags,
370    /// Buffer layout.
371    buffer_layout: BufferLayout,
372}
373
374/// Describes the buffer layout that [`Pool`] needs to know.
375#[derive(Debug, Clone, Copy)]
376struct BufferLayout {
377    /// Minimum tx buffer data length.
378    min_tx_data: usize,
379    /// Minimum tx buffer head length.
380    min_tx_head: u16,
381    /// Minimum tx buffer tail length.
382    min_tx_tail: u16,
383    /// The length of a buffer.
384    length: usize,
385}
386
387/// Network device base info with all required fields.
388#[derive(Debug, Clone, ValidFidlTable)]
389#[fidl_table_src(netdev::DeviceBaseInfo)]
390#[fidl_table_strict]
391pub struct DeviceBaseInfo {
392    /// Maximum number of items in rx FIFO (per session).
393    pub rx_depth: u16,
394    /// Maximum number of items in tx FIFO (per session).
395    pub tx_depth: u16,
396    /// Alignment requirement for buffers in the data VMO.
397    pub buffer_alignment: u32,
398    /// Maximum supported length of buffers in the data VMO, in bytes.
399    #[fidl_field_type(optional)]
400    pub max_buffer_length: Option<NonZeroU32>,
401    /// The minimum rx buffer length required for device.
402    pub min_rx_buffer_length: u32,
403    /// The minimum tx buffer length required for the device.
404    pub min_tx_buffer_length: u32,
405    /// The number of bytes the device requests be free as `head` space in a tx buffer.
406    pub min_tx_buffer_head: u16,
407    /// The amount of bytes the device requests be free as `tail` space in a tx buffer.
408    pub min_tx_buffer_tail: u16,
409    /// Maximum descriptor chain length accepted by the device.
410    pub max_buffer_parts: u8,
411    /// Minimum amount of Rx buffers the client needs to prepare for the
412    /// network device.
413    #[fidl_field_type(optional)]
414    pub min_rx_buffers: Option<NonZeroU16>,
415    /// Available rx acceleration flags for this device.
416    #[fidl_field_type(default)]
417    pub rx_accel: Vec<netdev::RxAcceleration>,
418    /// Available tx acceleration flags for this device.
419    #[fidl_field_type(default)]
420    pub tx_accel: Vec<netdev::TxAcceleration>,
421}
422
423/// Network device information with all required fields.
424#[derive(Debug, Clone, ValidFidlTable)]
425#[fidl_table_src(netdev::DeviceInfo)]
426#[fidl_table_strict]
427pub struct DeviceInfo {
428    /// Minimum descriptor length, in 64-bit words.
429    pub min_descriptor_length: u8,
430    /// Accepted descriptor version.
431    pub descriptor_version: u8,
432    /// Device base info.
433    pub base_info: DeviceBaseInfo,
434}
435
436/// Basic session configuration that can be given to [`DeviceInfo`] to generate
437/// [`Config`]s.
438#[derive(Debug, Copy, Clone)]
439pub struct DerivableConfig {
440    /// The desired default buffer length for the session.
441    pub default_buffer_length: usize,
442    /// Enable rx lease watching.
443    pub watch_rx_leases: bool,
444}
445
446impl DerivableConfig {
447    /// A sensibly common default buffer length to be used in
448    /// [`DerivableConfig`]. Provided to ease test writing.
449    ///
450    /// Chosen to be the next power of two after the default Ethernet MTU.
451    ///
452    /// This is the value of the buffer length in the `Default` impl.
453    pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
454    /// The value returned by the `Default` impl.
455    pub const DEFAULT: Self =
456        Self { default_buffer_length: Self::DEFAULT_BUFFER_LENGTH, watch_rx_leases: false };
457}
458
459impl Default for DerivableConfig {
460    fn default() -> Self {
461        Self::DEFAULT
462    }
463}
464
465impl DeviceInfo {
466    /// Create a new session config from the device information.
467    ///
468    /// This method also does the boundary checks so that data_length/offset fields read
469    /// from descriptors are safe to convert to [`usize`].
470    pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
471        let DeviceInfo {
472            min_descriptor_length,
473            descriptor_version,
474            base_info:
475                DeviceBaseInfo {
476                    rx_depth,
477                    tx_depth,
478                    buffer_alignment,
479                    max_buffer_length,
480                    min_rx_buffer_length,
481                    min_tx_buffer_length,
482                    min_tx_buffer_head,
483                    min_tx_buffer_tail,
484                    max_buffer_parts: _,
485                    min_rx_buffers: _,
486                    rx_accel: _,
487                    tx_accel: _,
488                },
489        } = self;
490        if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
491            return Err(Error::Config(format!(
492                "descriptor version mismatch: {} != {}",
493                NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
494            )));
495        }
496        if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
497            return Err(Error::Config(format!(
498                "descriptor length too small: {} < {}",
499                NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
500            )));
501        }
502
503        let DerivableConfig { default_buffer_length, watch_rx_leases } = config;
504
505        let num_rx_buffers =
506            NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
507        let num_tx_buffers =
508            NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
509
510        let max_buffer_length = max_buffer_length
511            .and_then(|max| {
512                // The error case is the case where max_buffer_length can't fix in a
513                // usize, but we use it to compare it to usizes, so that's
514                // equivalent to no limit.
515                usize::try_from(max.get()).ok_checked::<TryFromIntError>()
516            })
517            .unwrap_or(usize::MAX);
518        let min_buffer_length = usize::try_from(*min_rx_buffer_length)
519            .ok_checked::<TryFromIntError>()
520            .unwrap_or(usize::MAX);
521
522        let buffer_length =
523            usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
524
525        let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
526            |std::num::TryFromIntError { .. }| {
527                Error::Config(format!(
528                    "buffer_alignment not representable within usize: {}",
529                    buffer_alignment,
530                ))
531            },
532        )?;
533
534        let buffer_stride = buffer_length
535            .checked_add(buffer_alignment - 1)
536            .map(|x| x / buffer_alignment * buffer_alignment)
537            .ok_or_else(|| {
538                Error::Config(format!(
539                    "not possible to align {} to {} under usize::MAX",
540                    buffer_length, buffer_alignment,
541                ))
542            })?;
543
544        if buffer_stride < buffer_length {
545            return Err(Error::Config(format!(
546                "buffer stride too small {} < {}",
547                buffer_stride, buffer_length
548            )));
549        }
550
551        if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
552            return Err(Error::Config(format!(
553                "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
554                buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
555            )));
556        }
557
558        let num_buffers =
559            rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
560                Error::Config(format!(
561                    "too many buffers requested: {} + {} > u16::MAX",
562                    rx_depth, tx_depth
563                ))
564            })?;
565
566        let buffer_stride =
567            u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
568                Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
569            })?;
570
571        // This is following the practice of rust stdlib to ensure allocation
572        // size never reaches isize::MAX.
573        // https://doc.rust-lang.org/std/primitive.pointer.html#method.add-1.
574        match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
575            None | Some(Err(std::num::TryFromIntError { .. })) => {
576                return Err(Error::Config(format!(
577                    "too much memory required for the buffers: {} * {} > isize::MAX",
578                    buffer_stride, num_buffers
579                )));
580            }
581            Some(Ok(_total)) => (),
582        };
583
584        let buffer_stride = NonZeroU64::new(buffer_stride)
585            .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
586
587        let min_tx_data = match usize::try_from(*min_tx_buffer_length)
588            .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
589        {
590            Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
591            // Either the conversion or the comparison failed.
592            Ok(None) | Err(std::num::TryFromIntError { .. }) => {
593                return Err(Error::Config(format!(
594                    "buffer_length smaller than minimum TX requirement: {} < {}",
595                    buffer_length, *min_tx_buffer_length
596                )));
597            }
598        };
599
600        let mut options = netdev::SessionFlags::empty();
601        options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
602
603        Ok(Config {
604            buffer_stride,
605            num_rx_buffers,
606            num_tx_buffers,
607            options,
608            buffer_layout: BufferLayout {
609                length: buffer_length,
610                min_tx_head: *min_tx_buffer_head,
611                min_tx_tail: *min_tx_buffer_tail,
612                min_tx_data,
613            },
614        })
615    }
616}
617
618/// A port of the device.
619#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
620pub struct Port {
621    pub(crate) base: u8,
622    pub(crate) salt: u8,
623}
624
625impl TryFrom<netdev::PortId> for Port {
626    type Error = Error;
627    fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
628        if base <= netdev::MAX_PORTS {
629            Ok(Self { base, salt })
630        } else {
631            Err(Error::InvalidPortId(base))
632        }
633    }
634}
635
636impl From<Port> for netdev::PortId {
637    fn from(Port { base, salt }: Port) -> Self {
638        Self { base, salt }
639    }
640}
641
642/// Pending descriptors to be sent to driver.
643struct Pending<K: AllocKind> {
644    inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
645}
646
647impl<K: AllocKind> Pending<K> {
648    fn new(descs: Vec<DescId<K>>) -> Self {
649        Self { inner: Mutex::new((descs, None)) }
650    }
651
652    /// Extends the pending descriptors buffer.
653    fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
654        let mut guard = self.inner.lock();
655        let (storage, waker) = &mut *guard;
656        storage.extend(descs);
657        if let Some(waker) = waker.take() {
658            waker.wake();
659        }
660    }
661
662    /// Submits the pending buffer to the driver through [`zx::Fifo`].
663    ///
664    /// It will return [`Poll::Pending`] if any of the following happens:
665    ///   - There are no descriptors pending.
666    ///   - The fifo is not ready for write.
667    fn poll_submit(
668        &self,
669        fifo: &fasync::Fifo<DescId<K>>,
670        cx: &mut Context<'_>,
671    ) -> Poll<Result<usize>> {
672        let mut guard = self.inner.lock();
673        let (storage, waker) = &mut *guard;
674        if storage.is_empty() {
675            *waker = Some(cx.waker().clone());
676            return Poll::Pending;
677        }
678
679        // TODO(https://fxbug.dev/42107145): We're assuming that writing to the
680        // FIFO here is a sufficient memory barrier for the other end to access
681        // the data. That is currently true but not really guaranteed by the
682        // API.
683        let submitted = ready!(fifo.try_write(cx, &storage[..]))
684            .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
685        let _drained = storage.drain(0..submitted);
686        Poll::Ready(Ok(submitted))
687    }
688}
689
690/// An intermediary buffer used to reduce syscall overhead by acting as a proxy
691/// to read entries from a FIFO.
692///
693/// `ReadyBuffer` caches read entries from a FIFO in pre-allocated memory,
694/// allowing different batch sizes between what is acquired from the FIFO and
695/// what's processed by the caller.
696struct ReadyBuffer<T> {
697    // NB: A vector of `MaybeUninit` here allows us to give a transparent memory
698    // layout to the FIFO object but still move objects out of our buffer
699    // without needing a `T: Default` implementation. There's a small added
700    // benefit of not paying for memory initialization on creation as well, but
701    // that's mostly negligible given all allocation is performed upfront.
702    data: Vec<MaybeUninit<T>>,
703    available: Range<usize>,
704}
705
706impl<T> Drop for ReadyBuffer<T> {
707    fn drop(&mut self) {
708        let Self { data, available } = self;
709        for initialized in &mut data[available.clone()] {
710            // SAFETY: the available range keeps track of initialized buffers,
711            // we must drop them on drop to uphold `MaybeUninit` expectations.
712            unsafe { initialized.assume_init_drop() }
713        }
714        *available = 0..0;
715    }
716}
717
718impl<T> ReadyBuffer<T> {
719    fn new(capacity: usize) -> Self {
720        let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
721        Self { data, available: 0..0 }
722    }
723
724    fn poll_with_fifo(
725        &mut self,
726        cx: &mut Context<'_>,
727        fifo: &fuchsia_async::Fifo<T>,
728    ) -> Poll<std::result::Result<T, zx::Status>>
729    where
730        T: fasync::FifoEntry,
731    {
732        let Self { data, available: Range { start, end } } = self;
733
734        loop {
735            // Always pop from available data first.
736            if *start != *end {
737                let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
738                *start += 1;
739                // SAFETY: Descriptor was in the initialized section, it was
740                // initialized.
741                let desc = unsafe { desc.assume_init() };
742                return Poll::Ready(Ok(desc));
743            }
744            // Fetch more from the FIFO.
745            let count = ready!(fifo.try_read(cx, &mut data[..]))?;
746            *start = 0;
747            *end = count;
748        }
749    }
750}
751
752struct TxIdleListeners {
753    event: event_listener::Event,
754    tx_in_flight: AtomicUsize,
755}
756
757impl TxIdleListeners {
758    fn new() -> Self {
759        Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
760    }
761
762    /// Decreases the number of outstanding tx buffers by 1.
763    ///
764    /// Notifies any tx idle listeners if the number reaches 0.
765    fn tx_complete(&self) {
766        let Self { event, tx_in_flight } = self;
767        let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
768        debug_assert_ne!(old_value, 0);
769        if old_value == 1 {
770            let _notified: usize = event.notify(usize::MAX);
771        }
772    }
773
774    /// Increases the number of outstanding tx buffers by 1.
775    fn tx_started(&self) {
776        let Self { event: _, tx_in_flight } = self;
777        let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
778    }
779
780    async fn wait(&self) {
781        let Self { event, tx_in_flight } = self;
782        // This is _the correct way_ of holding an `event_listener::Listener`.
783        // We check the condition before installing the listener in the fast
784        // case, then we must check the condition again after creating the
785        // listener in case we've raced with the condition updating. Finally we
786        // must loop and check the condition again because we're not fully
787        // guaranteed to not have spurious wakeups.
788        //
789        // See the event_listener crate documentation for more details.
790        loop {
791            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
792                return;
793            }
794
795            event_listener::listener!(event => listener);
796
797            if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
798                return;
799            }
800
801            listener.await;
802        }
803    }
804}
805
806/// An RAII lease possibly keeping the system from suspension.
807///
808/// Yielded from [`Session::watch_rx_leases`].
809///
810/// Dropping an `RxLease` relinquishes it.
811#[derive(Debug)]
812pub struct RxLease {
813    handle: netdev::DelegatedRxLeaseHandle,
814}
815
816impl Drop for RxLease {
817    fn drop(&mut self) {
818        let Self { handle } = self;
819        // Change detector in case we need any evolution on how to relinquish
820        // leases.
821        match handle {
822            netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
823                // Dropping the channel is enough to relinquish the lease.
824            }
825            netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
826                // Dropping the eventpair is enough to relinquish the lease.
827            }
828            netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
829        }
830    }
831}
832
833impl RxLease {
834    /// Peeks the internal lease.
835    pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
836        &self.handle
837    }
838}
839
840#[cfg(test)]
841mod tests {
842    use std::num::NonZeroU32;
843    use std::ops::Deref;
844    use std::sync::Arc;
845    use std::task::Poll;
846
847    use assert_matches::assert_matches;
848    use fuchsia_async::Fifo;
849    use test_case::test_case;
850    use zerocopy::{FromBytes, Immutable, IntoBytes};
851    use zx::HandleBased as _;
852
853    use crate::session::DerivableConfig;
854
855    use super::buffer::{
856        AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
857    };
858    use super::{
859        BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
860        ReadyBuffer, Task, TxIdleListeners,
861    };
862
863    const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
864        rx_depth: 1,
865        tx_depth: 1,
866        buffer_alignment: 1,
867        max_buffer_length: None,
868        min_rx_buffer_length: 0,
869        min_tx_buffer_head: 0,
870        min_tx_buffer_length: 0,
871        min_tx_buffer_tail: 0,
872        max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
873        min_rx_buffers: None,
874        rx_accel: Vec::new(),
875        tx_accel: Vec::new(),
876    };
877
878    const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
879        min_descriptor_length: 0,
880        descriptor_version: 1,
881        base_info: DEFAULT_DEVICE_BASE_INFO,
882    };
883
884    const DEFAULT_BUFFER_LENGTH: usize = 2048;
885
886    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
887        min_descriptor_length: u8::MAX,
888        ..DEFAULT_DEVICE_INFO
889    }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
890    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
891        descriptor_version: 42,
892        ..DEFAULT_DEVICE_INFO
893    }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
894    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
895        base_info: DeviceBaseInfo {
896            tx_depth: 0,
897            ..DEFAULT_DEVICE_BASE_INFO
898        },
899        ..DEFAULT_DEVICE_INFO
900    }, "no TX buffers")]
901    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
902        base_info: DeviceBaseInfo {
903            rx_depth: 0,
904            ..DEFAULT_DEVICE_BASE_INFO
905        },
906        ..DEFAULT_DEVICE_INFO
907    }, "no RX buffers")]
908    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
909        base_info: DeviceBaseInfo {
910            tx_depth: u16::MAX,
911            rx_depth: u16::MAX,
912            ..DEFAULT_DEVICE_BASE_INFO
913        },
914        ..DEFAULT_DEVICE_INFO
915    }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
916    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
917        base_info: DeviceBaseInfo {
918            min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
919            ..DEFAULT_DEVICE_BASE_INFO
920        },
921        ..DEFAULT_DEVICE_INFO
922    }, format!(
923        "buffer_length smaller than minimum TX requirement: {} < {}",
924        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
925    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
926        base_info: DeviceBaseInfo {
927            min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
928            ..DEFAULT_DEVICE_BASE_INFO
929        },
930        ..DEFAULT_DEVICE_INFO
931    }, format!(
932        "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
933        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
934    #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
935        base_info: DeviceBaseInfo {
936            min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
937            ..DEFAULT_DEVICE_BASE_INFO
938        },
939        ..DEFAULT_DEVICE_INFO
940    }, format!(
941        "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
942        DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
943    #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
944    #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
945    format!(
946        "too much memory required for the buffers: {} * {} > isize::MAX",
947        usize::MAX, 2))]
948    #[test_case(usize::MAX, DeviceInfo {
949        base_info: DeviceBaseInfo {
950            buffer_alignment: 2,
951            ..DEFAULT_DEVICE_BASE_INFO
952        },
953        ..DEFAULT_DEVICE_INFO
954    }, format!(
955        "not possible to align {} to {} under usize::MAX",
956        usize::MAX, 2))]
957    fn configs_from_device_info_err(
958        buffer_length: usize,
959        info: DeviceInfo,
960        expected: impl Deref<Target = str>,
961    ) {
962        let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
963        assert_matches!(
964            info.make_config(config),
965            Err(Error::Config(got)) if got.as_str() == expected.deref()
966        );
967    }
968
969    #[test_case(DeviceInfo {
970        base_info: DeviceBaseInfo {
971            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
972            ..DEFAULT_DEVICE_BASE_INFO
973        },
974        ..DEFAULT_DEVICE_INFO
975    }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
976    #[test_case(DeviceInfo {
977        base_info: DeviceBaseInfo {
978            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
979            ..DEFAULT_DEVICE_BASE_INFO
980        },
981        ..DEFAULT_DEVICE_INFO
982    }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
983    #[test_case(DeviceInfo {
984        base_info: DeviceBaseInfo {
985            min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
986            max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
987            ..DEFAULT_DEVICE_BASE_INFO
988        },
989        ..DEFAULT_DEVICE_INFO
990    }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
991    fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
992        let config = info
993            .make_config(DerivableConfig {
994                default_buffer_length: DEFAULT_BUFFER_LENGTH,
995                ..Default::default()
996            })
997            .expect("is valid");
998        let Config {
999            buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
1000            buffer_stride: _,
1001            num_rx_buffers: _,
1002            num_tx_buffers: _,
1003            options: _,
1004        } = config;
1005        assert_eq!(length, expected_length);
1006    }
1007
1008    fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
1009        let (handle, other_end) = zx::Fifo::create(1).unwrap();
1010        (Fifo::from_fifo(handle), other_end)
1011    }
1012
1013    fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
1014        fifo: Fifo<T>,
1015        rights_to_remove: zx::Rights,
1016    ) -> Fifo<T> {
1017        let fifo = zx::Fifo::from(fifo);
1018        let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
1019
1020        let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1021        Fifo::from_fifo(fifo)
1022    }
1023
1024    enum TxOrRx {
1025        Tx,
1026        Rx,
1027    }
1028    #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1029    #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1030    #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1031    #[fuchsia_async::run_singlethreaded(test)]
1032    async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1033        // This is a regression test for https://fxbug.dev/42072513. The flake
1034        // that caused that bug occurred because the Zircon channel was closed
1035        // but the error returned by a failed attempt to write to it wasn't
1036        // being propagated upwards. This test produces a similar situation by
1037        // altering the right on the FIFOs the task uses so as to cause either
1038        // an attempt to write or to read to fail. For completeness, it
1039        // exercises all the FIFO polls that comprise Task::poll.
1040        let (pool, _descriptors, _data) = Pool::new(
1041            DEFAULT_DEVICE_INFO
1042                .make_config(DerivableConfig {
1043                    default_buffer_length: DEFAULT_BUFFER_LENGTH,
1044                    ..Default::default()
1045                })
1046                .expect("is valid"),
1047        )
1048        .expect("is valid");
1049        let (session_proxy, _session_server) =
1050            fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1051
1052        let (rx, _rx_sender) = make_fifos();
1053        let (tx, _tx_receiver) = make_fifos();
1054
1055        // Attenuate rights on one of the FIFOs.
1056        let (tx, rx) = match which_fifo {
1057            TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1058            TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1059        };
1060
1061        let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1062        let inner = Arc::new(Inner {
1063            pool,
1064            proxy: session_proxy,
1065            name: "fake_task".to_string(),
1066            rx,
1067            tx,
1068            tx_pending: Pending::new(vec![]),
1069            rx_ready: Mutex::new(ReadyBuffer::new(10)),
1070            tx_ready: Mutex::new(ReadyBuffer::new(10)),
1071            tx_idle_listeners: TxIdleListeners::new(),
1072        });
1073
1074        inner.send(buf).expect("can send");
1075
1076        let mut task = Task { inner };
1077
1078        // The task should not be able to continue because it can't read from or
1079        // write to one of the FIFOs.
1080        assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1081    }
1082}