netstack3_device/queue/
api.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! Transmit and Receive queue API objects.
6
7use core::marker::PhantomData;
8
9use netstack3_base::{
10    ContextPair, Device, DeviceIdContext, ResourceCounterContext, WorkQueueReport,
11};
12
13use crate::internal::base::DeviceSendFrameError;
14use crate::internal::queue::rx::{
15    ReceiveDequeContext, ReceiveDequeFrameContext as _, ReceiveQueueBindingsContext,
16    ReceiveQueueContext as _, ReceiveQueueState,
17};
18use crate::internal::queue::tx::{
19    self, TransmitDequeueContext, TransmitQueueBindingsContext, TransmitQueueCommon,
20    TransmitQueueConfiguration, TransmitQueueContext as _, TransmitQueueState,
21};
22use crate::internal::queue::{fifo, BatchSize, DequeueResult, DequeueState};
23use crate::internal::socket::DeviceSocketHandler;
24use crate::DeviceCounters;
25use log::debug;
26
27/// An API to interact with device `D` transmit queues.
28pub struct TransmitQueueApi<D, C>(C, PhantomData<D>);
29
30impl<D, C> TransmitQueueApi<D, C> {
31    /// Creates a new [`TransmitQueueApi`] from `ctx`.
32    pub fn new(ctx: C) -> Self {
33        Self(ctx, PhantomData)
34    }
35}
36
37impl<D, C> TransmitQueueApi<D, C>
38where
39    D: Device,
40    C: ContextPair,
41    C::CoreContext:
42        TransmitDequeueContext<D, C::BindingsContext> + DeviceSocketHandler<D, C::BindingsContext>,
43    for<'a> <C::CoreContext as TransmitDequeueContext<D, C::BindingsContext>>::TransmitQueueCtx<'a>:
44        ResourceCounterContext<<C::CoreContext as DeviceIdContext<D>>::DeviceId, DeviceCounters>,
45    C::BindingsContext:
46        TransmitQueueBindingsContext<<C::CoreContext as DeviceIdContext<D>>::DeviceId>,
47{
48    fn contexts(&mut self) -> (&mut C::CoreContext, &mut C::BindingsContext) {
49        let Self(pair, PhantomData) = self;
50        pair.contexts()
51    }
52
53    fn core_ctx(&mut self) -> &mut C::CoreContext {
54        self.contexts().0
55    }
56
57    /// Transmits any queued frames.
58    ///
59    /// Up to `batch_size` frames will attempt to be dequeued and sent in this
60    /// call.
61    ///
62    /// `dequeue_context` is directly given to the context to operate on each
63    /// individual frame.
64    pub fn transmit_queued_frames(
65        &mut self,
66        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
67        batch_size: BatchSize,
68        dequeue_context: &mut <
69            C::CoreContext as TransmitQueueCommon<D, C::BindingsContext>
70        >::DequeueContext,
71    ) -> Result<WorkQueueReport, DeviceSendFrameError> {
72        let (core_ctx, bindings_ctx) = self.contexts();
73        core_ctx.with_dequed_packets_and_tx_queue_ctx(
74            device_id,
75            |DequeueState { dequeued_frames: dequed_packets }, tx_queue_ctx| {
76                assert!(
77                    dequed_packets.is_empty(),
78                    "should never have left packets after attempting to dequeue"
79                );
80
81                let ret = tx_queue_ctx.with_transmit_queue_mut(
82                    device_id,
83                    |TransmitQueueState { allocator: _, queue }| {
84                        queue.as_mut().map(|q| q.dequeue_into(dequed_packets, batch_size.into()))
85                    },
86                );
87
88                // If we don't have a transmit queue installed, report no work
89                // left to be done.
90                let Some(ret) = ret else { return Ok(WorkQueueReport::AllDone) };
91
92                while let Some((meta, p)) = dequed_packets.pop_front() {
93                    tx::deliver_to_device_sockets(tx_queue_ctx, bindings_ctx, device_id, &p, &meta);
94
95                    match tx_queue_ctx.send_frame(
96                        bindings_ctx,
97                        device_id,
98                        Some(dequeue_context),
99                        meta,
100                        p,
101                    ) {
102                        Ok(()) => {}
103                        Err(e) => {
104                            tx_queue_ctx.increment_both(device_id, |c| &c.send_dropped_dequeue);
105                            // We failed to send the frame so requeue the rest
106                            // and try again later. The failed packet is lost.
107                            // We shouldn't requeue it because it's already been
108                            // delivered to packet sockets.
109                            tx_queue_ctx.with_transmit_queue_mut(
110                                device_id,
111                                |TransmitQueueState { allocator: _, queue }| {
112                                    queue.as_mut().unwrap().requeue_items(dequed_packets);
113                                },
114                            );
115                            return Err(e);
116                        }
117                    }
118                }
119
120                Ok(ret.into())
121            },
122        )
123    }
124
125    /// Returns the number of frames in `device_id`'s TX queue.
126    ///
127    /// Returns `None` if the device doesn't have a queue configured.
128    pub fn count(
129        &mut self,
130        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
131    ) -> Option<usize> {
132        self.core_ctx().with_transmit_queue(device_id, |TransmitQueueState { queue, .. }| {
133            queue.as_ref().map(|q| q.len())
134        })
135    }
136
137    /// Sets the queue configuration for the device.
138    pub fn set_configuration(
139        &mut self,
140        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
141        config: TransmitQueueConfiguration,
142    ) {
143        let (core_ctx, bindings_ctx) = self.contexts();
144        // We take the dequeue lock as well to make sure we finish any current
145        // dequeuing before changing the configuration.
146        core_ctx.with_dequed_packets_and_tx_queue_ctx(
147            device_id,
148            |DequeueState { dequeued_frames: dequed_packets }, tx_queue_ctx| {
149                assert!(
150                    dequed_packets.is_empty(),
151                    "should never have left packets after attempting to dequeue"
152                );
153
154                let prev_queue = tx_queue_ctx.with_transmit_queue_mut(
155                    device_id,
156                    |TransmitQueueState { allocator: _, queue }| {
157                        match config {
158                            TransmitQueueConfiguration::None => core::mem::take(queue),
159                            TransmitQueueConfiguration::Fifo => {
160                                match queue {
161                                    None => *queue = Some(fifo::Queue::default()),
162                                    // Already a FiFo queue.
163                                    Some(_) => {}
164                                }
165
166                                None
167                            }
168                        }
169                    },
170                );
171
172                let Some(mut prev_queue) = prev_queue else { return };
173
174                loop {
175                    let ret = prev_queue.dequeue_into(dequed_packets, BatchSize::MAX);
176
177                    while let Some((meta, p)) = dequed_packets.pop_front() {
178                        tx::deliver_to_device_sockets(
179                            tx_queue_ctx,
180                            bindings_ctx,
181                            device_id,
182                            &p,
183                            &meta,
184                        );
185                        match tx_queue_ctx.send_frame(bindings_ctx, device_id, None, meta, p) {
186                            Ok(()) => {}
187                            Err(err) => {
188                                // We swapped to no-queue and device cannot send
189                                // the frame so we just drop it.
190                                debug!("frame dropped during queue reconfiguration: {err:?}");
191                            }
192                        }
193                    }
194
195                    match ret {
196                        DequeueResult::NoMoreLeft => break,
197                        DequeueResult::MoreStillQueued => {}
198                    }
199                }
200            },
201        )
202    }
203}
204
205/// /// An API to interact with device `D` receive queues.
206pub struct ReceiveQueueApi<D, C>(C, PhantomData<D>);
207
208impl<D, C> ReceiveQueueApi<D, C> {
209    /// Creates a new [`ReceiveQueueApi`] from `ctx`.
210    pub fn new(ctx: C) -> Self {
211        Self(ctx, PhantomData)
212    }
213}
214
215impl<D, C> ReceiveQueueApi<D, C>
216where
217    D: Device,
218    C: ContextPair,
219    C::BindingsContext:
220        ReceiveQueueBindingsContext<<C::CoreContext as DeviceIdContext<D>>::DeviceId>,
221    C::CoreContext: ReceiveDequeContext<D, C::BindingsContext>,
222{
223    fn contexts(&mut self) -> (&mut C::CoreContext, &mut C::BindingsContext) {
224        let Self(pair, PhantomData) = self;
225        pair.contexts()
226    }
227
228    /// Handle a batch of queued RX packets for the device.
229    ///
230    /// If packets remain in the RX queue after a batch of RX packets has been
231    /// handled, the RX task will be scheduled to run again so the next batch of
232    /// RX packets may be handled. See
233    /// [`ReceiveQueueBindingsContext::wake_rx_task`] for more details.
234    pub fn handle_queued_frames(
235        &mut self,
236        device_id: &<C::CoreContext as DeviceIdContext<D>>::DeviceId,
237    ) -> WorkQueueReport {
238        let (core_ctx, bindings_ctx) = self.contexts();
239        core_ctx.with_dequed_frames_and_rx_queue_ctx(
240            device_id,
241            |DequeueState { dequeued_frames }, rx_queue_ctx| {
242                assert_eq!(
243                    dequeued_frames.len(),
244                    0,
245                    "should not keep dequeued frames across calls to this fn"
246                );
247
248                let ret = rx_queue_ctx.with_receive_queue_mut(
249                    device_id,
250                    |ReceiveQueueState { queue }| {
251                        queue.dequeue_into(dequeued_frames, BatchSize::MAX)
252                    },
253                );
254
255                while let Some((meta, p)) = dequeued_frames.pop_front() {
256                    rx_queue_ctx.handle_frame(bindings_ctx, device_id, meta, p);
257                }
258
259                ret.into()
260            },
261        )
262    }
263}