netstack3_device/queue/
rx.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! RX device queues.
6
7use core::fmt::Debug;
8
9use derivative::Derivative;
10use netstack3_base::sync::Mutex;
11use netstack3_base::{Device, DeviceIdContext};
12use packet::BufferMut;
13
14use crate::internal::queue::{DequeueState, EnqueueResult, ReceiveQueueFullError, fifo};
15
16/// The state used to hold a queue of received frames to be handled at a later
17/// time.
18#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub struct ReceiveQueueState<Meta, Buffer> {
22    pub(super) queue: fifo::Queue<Meta, Buffer>,
23}
24
25#[cfg(any(test, feature = "testutils"))]
26impl<Meta, Buffer> ReceiveQueueState<Meta, Buffer> {
27    /// Takes all the pending frames from the receive queue.
28    pub fn take_frames(&mut self) -> impl Iterator<Item = (Meta, Buffer)> + use<Meta, Buffer> {
29        let Self { queue } = self;
30        let mut vec = Default::default();
31        assert_matches::assert_matches!(
32            queue.dequeue_into(&mut vec, usize::MAX),
33            crate::internal::queue::DequeueResult::NoMoreLeft
34        );
35        vec.into_iter()
36    }
37
38    /// Returns an iterator over the queued frames.
39    pub fn iter_frames(&self) -> impl Iterator<Item = &(Meta, Buffer)> {
40        self.queue.iter_frames()
41    }
42}
43
44/// The bindings context for the receive queue.
45pub trait ReceiveQueueBindingsContext<DeviceId> {
46    /// Signals to bindings that RX frames are available and ready to be handled
47    /// by device.
48    ///
49    /// Implementations must make sure that the API call to handle queued
50    /// packets is scheduled to be called as soon as possible so that enqueued
51    /// RX frames are promptly handled.
52    fn wake_rx_task(&mut self, device_id: &DeviceId);
53}
54
55/// Holds queue and dequeue state for the receive queue.
56#[derive(Derivative)]
57#[derivative(Default(bound = ""))]
58pub struct ReceiveQueue<Meta, Buffer> {
59    /// The state for dequeued frames that will be handled.
60    ///
61    /// See `queue` for lock ordering.
62    pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
63    /// A queue of received frames protected by a lock.
64    ///
65    /// Lock ordering: `deque` must be locked before `queue` is locked when both
66    /// are needed at the same time.
67    pub(crate) queue: Mutex<ReceiveQueueState<Meta, Buffer>>,
68}
69
70/// Defines opaque types for frames in the receive queue.
71pub trait ReceiveQueueTypes<D: Device, BC>: DeviceIdContext<D> {
72    /// Metadata associated with an RX frame.
73    type Meta;
74
75    /// The type of buffer holding an RX frame.
76    type Buffer: BufferMut + Debug;
77}
78
79/// The execution context for a receive queue.
80pub trait ReceiveQueueContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
81    /// Calls the function with the RX queue state.
82    fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<Self::Meta, Self::Buffer>) -> O>(
83        &mut self,
84        device_id: &Self::DeviceId,
85        cb: F,
86    ) -> O;
87}
88
89pub trait ReceiveDequeFrameContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
90    /// Handle a received frame.
91    fn handle_frame(
92        &mut self,
93        bindings_ctx: &mut BC,
94        device_id: &Self::DeviceId,
95        meta: Self::Meta,
96        buf: Self::Buffer,
97    );
98}
99
100/// The core execution context for dequeuing frames from the receive queue.
101pub trait ReceiveDequeContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
102    /// The inner dequeueing context.
103    type ReceiveQueueCtx<'a>: ReceiveQueueContext<
104            D,
105            BC,
106            Meta = Self::Meta,
107            Buffer = Self::Buffer,
108            DeviceId = Self::DeviceId,
109        > + ReceiveDequeFrameContext<
110            D,
111            BC,
112            Meta = Self::Meta,
113            Buffer = Self::Buffer,
114            DeviceId = Self::DeviceId,
115        >;
116
117    /// Calls the function with the RX deque state and the RX queue context.
118    fn with_dequed_frames_and_rx_queue_ctx<
119        O,
120        F: FnOnce(&mut DequeueState<Self::Meta, Self::Buffer>, &mut Self::ReceiveQueueCtx<'_>) -> O,
121    >(
122        &mut self,
123        device_id: &Self::DeviceId,
124        cb: F,
125    ) -> O;
126}
127
128/// An implementation of a receive queue, with a buffer.
129pub trait ReceiveQueueHandler<D: Device, BC>: ReceiveQueueTypes<D, BC> {
130    /// Queues a frame for reception.
131    ///
132    /// # Errors
133    ///
134    /// Returns an error with the metadata and body if the queue is full.
135    fn queue_rx_frame(
136        &mut self,
137        bindings_ctx: &mut BC,
138        device_id: &Self::DeviceId,
139        meta: Self::Meta,
140        body: Self::Buffer,
141    ) -> Result<(), ReceiveQueueFullError<(Self::Meta, Self::Buffer)>>;
142}
143
144impl<D: Device, BC: ReceiveQueueBindingsContext<CC::DeviceId>, CC: ReceiveQueueContext<D, BC>>
145    ReceiveQueueHandler<D, BC> for CC
146{
147    fn queue_rx_frame(
148        &mut self,
149        bindings_ctx: &mut BC,
150        device_id: &CC::DeviceId,
151        meta: CC::Meta,
152        body: CC::Buffer,
153    ) -> Result<(), ReceiveQueueFullError<(Self::Meta, CC::Buffer)>> {
154        self.with_receive_queue_mut(device_id, |ReceiveQueueState { queue }| {
155            queue.queue_rx_frame(meta, body).map(|res| match res {
156                EnqueueResult::QueueWasPreviouslyEmpty => bindings_ctx.wake_rx_task(device_id),
157                EnqueueResult::QueuePreviouslyWasOccupied => {
158                    // We have already woken up the RX task when the queue was
159                    // previously empty so there is no need to do it again.
160                }
161            })
162        })
163    }
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    use alloc::vec;
171    use alloc::vec::Vec;
172    use assert_matches::assert_matches;
173
174    use netstack3_base::testutil::{
175        FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
176    };
177    use netstack3_base::{ContextPair, CtxPair, WorkQueueReport};
178    use packet::Buf;
179
180    use crate::internal::queue::api::ReceiveQueueApi;
181    use crate::internal::queue::{BatchSize, MAX_RX_QUEUED_LEN};
182
183    #[derive(Default)]
184    struct FakeRxQueueState {
185        queue: ReceiveQueueState<(), Buf<Vec<u8>>>,
186        handled_frames: Vec<Buf<Vec<u8>>>,
187    }
188
189    #[derive(Default)]
190    struct FakeRxQueueBindingsCtxState {
191        woken_rx_tasks: Vec<FakeLinkDeviceId>,
192    }
193
194    type FakeCoreCtxImpl = FakeCoreCtx<FakeRxQueueState, (), FakeLinkDeviceId>;
195    type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeRxQueueBindingsCtxState, ()>;
196
197    impl ReceiveQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
198        fn wake_rx_task(&mut self, device_id: &FakeLinkDeviceId) {
199            self.state.woken_rx_tasks.push(device_id.clone())
200        }
201    }
202
203    impl ReceiveQueueTypes<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
204        type Meta = ();
205        type Buffer = Buf<Vec<u8>>;
206    }
207
208    impl ReceiveQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
209        fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<(), Buf<Vec<u8>>>) -> O>(
210            &mut self,
211            &FakeLinkDeviceId: &FakeLinkDeviceId,
212            cb: F,
213        ) -> O {
214            cb(&mut self.state.queue)
215        }
216    }
217
218    impl ReceiveDequeFrameContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
219        fn handle_frame(
220            &mut self,
221            _bindings_ctx: &mut FakeBindingsCtxImpl,
222            &FakeLinkDeviceId: &FakeLinkDeviceId,
223            (): (),
224            buf: Buf<Vec<u8>>,
225        ) {
226            self.state.handled_frames.push(buf)
227        }
228    }
229
230    impl ReceiveDequeContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
231        type ReceiveQueueCtx<'a> = Self;
232
233        fn with_dequed_frames_and_rx_queue_ctx<
234            O,
235            F: FnOnce(
236                &mut DequeueState<Self::Meta, Self::Buffer>,
237                &mut Self::ReceiveQueueCtx<'_>,
238            ) -> O,
239        >(
240            &mut self,
241            &FakeLinkDeviceId: &FakeLinkDeviceId,
242            cb: F,
243        ) -> O {
244            cb(&mut DequeueState::default(), self)
245        }
246    }
247
248    /// A trait providing a shortcut to instantiate a [`TransmitQueueApi`] from a context.
249    trait ReceiveQueueApiExt: ContextPair + Sized {
250        fn receive_queue_api<D>(&mut self) -> ReceiveQueueApi<D, &mut Self> {
251            ReceiveQueueApi::new(self)
252        }
253    }
254
255    impl<O> ReceiveQueueApiExt for O where O: ContextPair + Sized {}
256
257    #[test]
258    fn queue_and_dequeue() {
259        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
260
261        for _ in 0..2 {
262            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
263            for i in 0..MAX_RX_QUEUED_LEN {
264                let body = Buf::new(vec![i as u8], ..);
265                assert_eq!(
266                    ReceiveQueueHandler::queue_rx_frame(
267                        core_ctx,
268                        bindings_ctx,
269                        &FakeLinkDeviceId,
270                        (),
271                        body
272                    ),
273                    Ok(())
274                );
275                // We should only ever be woken up once when the first frame
276                // was enqueued.
277                assert_eq!(bindings_ctx.state.woken_rx_tasks, [FakeLinkDeviceId]);
278            }
279
280            let body = Buf::new(vec![131], ..);
281            assert_eq!(
282                ReceiveQueueHandler::queue_rx_frame(
283                    core_ctx,
284                    bindings_ctx,
285                    &FakeLinkDeviceId,
286                    (),
287                    body.clone(),
288                ),
289                Err(ReceiveQueueFullError(((), body)))
290            );
291            // We should only ever be woken up once when the first frame
292            // was enqueued.
293            assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_rx_tasks), [FakeLinkDeviceId]);
294            assert!(MAX_RX_QUEUED_LEN > BatchSize::MAX);
295            for i in (0..(MAX_RX_QUEUED_LEN - BatchSize::MAX)).step_by(BatchSize::MAX) {
296                assert_eq!(
297                    ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
298                    WorkQueueReport::Pending
299                );
300                assert_eq!(
301                    core::mem::take(&mut ctx.core_ctx.state.handled_frames),
302                    (i..i + BatchSize::MAX)
303                        .map(|i| Buf::new(vec![i as u8], ..))
304                        .collect::<Vec<_>>()
305                );
306            }
307
308            assert_eq!(
309                ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
310                WorkQueueReport::AllDone
311            );
312            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
313            assert_eq!(
314                core::mem::take(&mut core_ctx.state.handled_frames),
315                (BatchSize::MAX * (MAX_RX_QUEUED_LEN / BatchSize::MAX - 1)..MAX_RX_QUEUED_LEN)
316                    .map(|i| Buf::new(vec![i as u8], ..))
317                    .collect::<Vec<_>>()
318            );
319            // Should not have woken up the RX task since the queue should be
320            // empty.
321            assert_matches!(&core::mem::take(&mut bindings_ctx.state.woken_rx_tasks)[..], []);
322
323            // The queue should now be empty so the next iteration of queueing
324            // `MAX_RX_QUEUED_LEN` frames should succeed.
325        }
326    }
327}