Skip to main content

netstack3_device/queue/
api.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Transmit and Receive queue API objects.
6
7use core::marker::PhantomData;
8
9use netstack3_base::{
10    ContextPair, Device, DeviceIdContext, ResourceCounterContext, WorkQueueReport,
11};
12
13use crate::DeviceCounters;
14use crate::internal::base::DeviceSendFrameError;
15use crate::internal::queue::rx::{
16    ReceiveDequeContext, ReceiveDequeFrameContext as _, ReceiveQueueBindingsContext,
17    ReceiveQueueContext as _, ReceiveQueueState,
18};
19use crate::internal::queue::tx::{
20    self, TransmitDequeueContext, TransmitQueueBindingsContext, TransmitQueueCommon,
21    TransmitQueueConfiguration, TransmitQueueContext as _, TransmitQueueState,
22};
23use crate::internal::queue::{BatchSize, DequeueResult, DequeueState, DeviceBufferSpec, fifo};
24use crate::internal::socket::DeviceSocketHandler;
25use log::debug;
26
27/// An API to interact with device `D` transmit queues.
28pub struct TransmitQueueApi<D, C>(C, PhantomData<D>);
29
30impl<D, C> TransmitQueueApi<D, C> {
31    /// Creates a new [`TransmitQueueApi`] from `ctx`.
32    pub fn new(ctx: C) -> Self {
33        Self(ctx, PhantomData)
34    }
35}
36
37impl<D, C> TransmitQueueApi<D, C>
38where
39    D: DeviceBufferSpec<C::BindingsContext>,
40    C: ContextPair,
41    C::CoreContext:
42        TransmitDequeueContext<D, C::BindingsContext> + DeviceSocketHandler<D, C::BindingsContext>,
43    for<'a> <C::CoreContext as TransmitDequeueContext<D, C::BindingsContext>>::TransmitQueueCtx<'a>:
44        ResourceCounterContext<<C::CoreContext as DeviceIdContext<D>>::DeviceId, DeviceCounters>,
45    C::BindingsContext:
46        TransmitQueueBindingsContext<<C::CoreContext as DeviceIdContext<D>>::DeviceId>,
47{
48    fn contexts(&mut self) -> (&mut C::CoreContext, &mut C::BindingsContext) {
49        let Self(pair, PhantomData) = self;
50        pair.contexts()
51    }
52
53    fn core_ctx(&mut self) -> &mut C::CoreContext {
54        self.contexts().0
55    }
56
57    /// Transmits any queued frames.
58    ///
59    /// Up to `batch_size` frames will attempt to be dequeued and sent in this
60    /// call.
61    ///
62    /// `dequeue_context` is directly given to the context to operate on each
63    /// individual frame.
64    pub fn transmit_queued_frames(
65        &mut self,
66        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
67        batch_size: BatchSize,
68        dequeue_context: &mut <
69            C::CoreContext as TransmitQueueCommon<D, C::BindingsContext>
70        >::DequeueContext,
71    ) -> Result<WorkQueueReport, DeviceSendFrameError> {
72        let (core_ctx, bindings_ctx) = self.contexts();
73        core_ctx.with_dequed_packets_and_tx_queue_ctx(
74            device_id,
75            |DequeueState { dequeued_frames: dequed_packets }, tx_queue_ctx| {
76                assert!(
77                    dequed_packets.is_empty(),
78                    "should never have left packets after attempting to dequeue"
79                );
80
81                let ret = tx_queue_ctx.with_transmit_queue_mut(
82                    device_id,
83                    |TransmitQueueState { allocator: _, queue, checksum_offload_spec: _ }| {
84                        queue.as_mut().map(|q| q.dequeue_into(dequed_packets, batch_size.into()))
85                    },
86                );
87
88                // If we don't have a transmit queue installed, report no work
89                // left to be done.
90                let Some(ret) = ret else { return Ok(WorkQueueReport::AllDone) };
91
92                while let Some((meta, p)) = dequed_packets.pop_front() {
93                    tx::deliver_to_device_sockets(tx_queue_ctx, bindings_ctx, device_id, &p, &meta);
94
95                    match tx_queue_ctx.send_frame(
96                        bindings_ctx,
97                        device_id,
98                        Some(dequeue_context),
99                        meta,
100                        p,
101                    ) {
102                        Ok(()) => {}
103                        Err(e) => {
104                            tx_queue_ctx.increment_both(device_id, |c| &c.send_dropped_dequeue);
105                            // We failed to send the frame so requeue the rest
106                            // and try again later. The failed packet is lost.
107                            // We shouldn't requeue it because it's already been
108                            // delivered to packet sockets.
109                            tx_queue_ctx.with_transmit_queue_mut(
110                                device_id,
111                                |TransmitQueueState {
112                                     allocator: _,
113                                     queue,
114                                     checksum_offload_spec: _,
115                                 }| {
116                                    queue.as_mut().unwrap().requeue_items(dequed_packets);
117                                },
118                            );
119                            return Err(e);
120                        }
121                    }
122                }
123
124                Ok(ret.into())
125            },
126        )
127    }
128
129    /// Returns the number of frames in `device_id`'s TX queue.
130    ///
131    /// Returns `None` if the device doesn't have a queue configured.
132    pub fn count(
133        &mut self,
134        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
135    ) -> Option<usize> {
136        self.with_count(device_id, |count| count)
137    }
138
139    /// Calls `f` with the number of frames in `device_id`'s TX queue while the
140    /// Tx queue is locked.
141    pub fn with_count<R>(
142        &mut self,
143        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
144        f: impl FnOnce(Option<usize>) -> R,
145    ) -> R {
146        let count = self
147            .core_ctx()
148            .with_transmit_queue(device_id, |TransmitQueueState { queue, .. }| {
149                queue.as_ref().map(|q| q.len())
150            });
151        f(count)
152    }
153
154    /// Sets the queue configuration for the device.
155    pub fn set_configuration(
156        &mut self,
157        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
158        config: TransmitQueueConfiguration,
159    ) {
160        let (core_ctx, bindings_ctx) = self.contexts();
161        // We take the dequeue lock as well to make sure we finish any current
162        // dequeuing before changing the configuration.
163        core_ctx.with_dequed_packets_and_tx_queue_ctx(
164            device_id,
165            |DequeueState { dequeued_frames: dequed_packets }, tx_queue_ctx| {
166                assert!(
167                    dequed_packets.is_empty(),
168                    "should never have left packets after attempting to dequeue"
169                );
170
171                let prev_queue = tx_queue_ctx.with_transmit_queue_mut(
172                    device_id,
173                    |TransmitQueueState { allocator: _, queue, checksum_offload_spec: _ }| {
174                        match config {
175                            TransmitQueueConfiguration::None => core::mem::take(queue),
176                            TransmitQueueConfiguration::Fifo => {
177                                match queue {
178                                    None => *queue = Some(fifo::Queue::default()),
179                                    // Already a FiFo queue.
180                                    Some(_) => {}
181                                }
182
183                                None
184                            }
185                        }
186                    },
187                );
188
189                let Some(mut prev_queue) = prev_queue else { return };
190
191                loop {
192                    let ret = prev_queue.dequeue_into(dequed_packets, BatchSize::MAX);
193
194                    while let Some((meta, p)) = dequed_packets.pop_front() {
195                        tx::deliver_to_device_sockets(
196                            tx_queue_ctx,
197                            bindings_ctx,
198                            device_id,
199                            &p,
200                            &meta,
201                        );
202                        match tx_queue_ctx.send_frame(bindings_ctx, device_id, None, meta, p) {
203                            Ok(()) => {}
204                            Err(err) => {
205                                // We swapped to no-queue and device cannot send
206                                // the frame so we just drop it.
207                                debug!("frame dropped during queue reconfiguration: {err:?}");
208                            }
209                        }
210                    }
211
212                    match ret {
213                        DequeueResult::NoMoreLeft => break,
214                        DequeueResult::MoreStillQueued => {}
215                    }
216                }
217            },
218        )
219    }
220}
221
222/// /// An API to interact with device `D` receive queues.
223pub struct ReceiveQueueApi<D, C>(C, PhantomData<D>);
224
225impl<D, C> ReceiveQueueApi<D, C> {
226    /// Creates a new [`ReceiveQueueApi`] from `ctx`.
227    pub fn new(ctx: C) -> Self {
228        Self(ctx, PhantomData)
229    }
230}
231
232impl<D, C> ReceiveQueueApi<D, C>
233where
234    D: Device,
235    C: ContextPair,
236    C::BindingsContext:
237        ReceiveQueueBindingsContext<<C::CoreContext as DeviceIdContext<D>>::DeviceId>,
238    C::CoreContext: ReceiveDequeContext<D, C::BindingsContext>,
239{
240    fn contexts(&mut self) -> (&mut C::CoreContext, &mut C::BindingsContext) {
241        let Self(pair, PhantomData) = self;
242        pair.contexts()
243    }
244
245    /// Handle a batch of queued RX packets for the device.
246    ///
247    /// If packets remain in the RX queue after a batch of RX packets has been
248    /// handled, the RX task will be scheduled to run again so the next batch of
249    /// RX packets may be handled. See
250    /// [`ReceiveQueueBindingsContext::wake_rx_task`] for more details.
251    pub fn handle_queued_frames(
252        &mut self,
253        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
254    ) -> WorkQueueReport {
255        let (core_ctx, bindings_ctx) = self.contexts();
256        core_ctx.with_dequed_frames_and_rx_queue_ctx(
257            device_id,
258            |DequeueState { dequeued_frames }, rx_queue_ctx| {
259                assert_eq!(
260                    dequeued_frames.len(),
261                    0,
262                    "should not keep dequeued frames across calls to this fn"
263                );
264
265                let ret = rx_queue_ctx.with_receive_queue_mut(
266                    device_id,
267                    |ReceiveQueueState { queue }| {
268                        queue.dequeue_into(dequeued_frames, BatchSize::MAX)
269                    },
270                );
271
272                while let Some((meta, p)) = dequeued_frames.pop_front() {
273                    rx_queue_ctx.handle_frame(bindings_ctx, device_id, meta, p);
274                }
275
276                ret.into()
277            },
278        )
279    }
280}