netstack3_device/queue/
rx.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! RX device queues.
6
7use core::fmt::Debug;
8
9use derivative::Derivative;
10use netstack3_base::sync::Mutex;
11use netstack3_base::{Device, DeviceIdContext};
12use packet::BufferMut;
13
14use crate::internal::queue::{fifo, DequeueState, EnqueueResult, ReceiveQueueFullError};
15
16/// The state used to hold a queue of received frames to be handled at a later
17/// time.
18#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub struct ReceiveQueueState<Meta, Buffer> {
22    pub(super) queue: fifo::Queue<Meta, Buffer>,
23}
24
25#[cfg(any(test, feature = "testutils"))]
26impl<Meta, Buffer> ReceiveQueueState<Meta, Buffer> {
27    /// Takes all the pending frames from the receive queue.
28    pub fn take_frames(&mut self) -> impl Iterator<Item = (Meta, Buffer)> {
29        let Self { queue } = self;
30        let mut vec = Default::default();
31        assert_matches::assert_matches!(
32            queue.dequeue_into(&mut vec, usize::MAX),
33            crate::internal::queue::DequeueResult::NoMoreLeft
34        );
35        vec.into_iter()
36    }
37}
38
39/// The bindings context for the receive queue.
40pub trait ReceiveQueueBindingsContext<DeviceId> {
41    /// Signals to bindings that RX frames are available and ready to be handled
42    /// by device.
43    ///
44    /// Implementations must make sure that the API call to handle queued
45    /// packets is scheduled to be called as soon as possible so that enqueued
46    /// RX frames are promptly handled.
47    fn wake_rx_task(&mut self, device_id: &DeviceId);
48}
49
50/// Holds queue and dequeue state for the receive queue.
51#[derive(Derivative)]
52#[derivative(Default(bound = ""))]
53pub struct ReceiveQueue<Meta, Buffer> {
54    /// The state for dequeued frames that will be handled.
55    ///
56    /// See `queue` for lock ordering.
57    pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
58    /// A queue of received frames protected by a lock.
59    ///
60    /// Lock ordering: `deque` must be locked before `queue` is locked when both
61    /// are needed at the same time.
62    pub(crate) queue: Mutex<ReceiveQueueState<Meta, Buffer>>,
63}
64
65/// Defines opaque types for frames in the receive queue.
66pub trait ReceiveQueueTypes<D: Device, BC>: DeviceIdContext<D> {
67    /// Metadata associated with an RX frame.
68    type Meta;
69
70    /// The type of buffer holding an RX frame.
71    type Buffer: BufferMut + Debug;
72}
73
74/// The execution context for a receive queue.
75pub trait ReceiveQueueContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
76    /// Calls the function with the RX queue state.
77    fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<Self::Meta, Self::Buffer>) -> O>(
78        &mut self,
79        device_id: &Self::DeviceId,
80        cb: F,
81    ) -> O;
82}
83
84pub trait ReceiveDequeFrameContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
85    /// Handle a received frame.
86    fn handle_frame(
87        &mut self,
88        bindings_ctx: &mut BC,
89        device_id: &Self::DeviceId,
90        meta: Self::Meta,
91        buf: Self::Buffer,
92    );
93}
94
95/// The core execution context for dequeuing frames from the receive queue.
96pub trait ReceiveDequeContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
97    /// The inner dequeueing context.
98    type ReceiveQueueCtx<'a>: ReceiveQueueContext<
99            D,
100            BC,
101            Meta = Self::Meta,
102            Buffer = Self::Buffer,
103            DeviceId = Self::DeviceId,
104        > + ReceiveDequeFrameContext<
105            D,
106            BC,
107            Meta = Self::Meta,
108            Buffer = Self::Buffer,
109            DeviceId = Self::DeviceId,
110        >;
111
112    /// Calls the function with the RX deque state and the RX queue context.
113    fn with_dequed_frames_and_rx_queue_ctx<
114        O,
115        F: FnOnce(&mut DequeueState<Self::Meta, Self::Buffer>, &mut Self::ReceiveQueueCtx<'_>) -> O,
116    >(
117        &mut self,
118        device_id: &Self::DeviceId,
119        cb: F,
120    ) -> O;
121}
122
123/// An implementation of a receive queue, with a buffer.
124pub trait ReceiveQueueHandler<D: Device, BC>: ReceiveQueueTypes<D, BC> {
125    /// Queues a frame for reception.
126    ///
127    /// # Errors
128    ///
129    /// Returns an error with the metadata and body if the queue is full.
130    fn queue_rx_frame(
131        &mut self,
132        bindings_ctx: &mut BC,
133        device_id: &Self::DeviceId,
134        meta: Self::Meta,
135        body: Self::Buffer,
136    ) -> Result<(), ReceiveQueueFullError<(Self::Meta, Self::Buffer)>>;
137}
138
139impl<D: Device, BC: ReceiveQueueBindingsContext<CC::DeviceId>, CC: ReceiveQueueContext<D, BC>>
140    ReceiveQueueHandler<D, BC> for CC
141{
142    fn queue_rx_frame(
143        &mut self,
144        bindings_ctx: &mut BC,
145        device_id: &CC::DeviceId,
146        meta: CC::Meta,
147        body: CC::Buffer,
148    ) -> Result<(), ReceiveQueueFullError<(Self::Meta, CC::Buffer)>> {
149        self.with_receive_queue_mut(device_id, |ReceiveQueueState { queue }| {
150            queue.queue_rx_frame(meta, body).map(|res| match res {
151                EnqueueResult::QueueWasPreviouslyEmpty => bindings_ctx.wake_rx_task(device_id),
152                EnqueueResult::QueuePreviouslyWasOccupied => {
153                    // We have already woken up the RX task when the queue was
154                    // previously empty so there is no need to do it again.
155                }
156            })
157        })
158    }
159}
160
161#[cfg(test)]
162mod tests {
163    use super::*;
164
165    use alloc::vec;
166    use alloc::vec::Vec;
167    use assert_matches::assert_matches;
168
169    use netstack3_base::testutil::{
170        FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
171    };
172    use netstack3_base::{ContextPair, CtxPair, WorkQueueReport};
173    use packet::Buf;
174
175    use crate::internal::queue::api::ReceiveQueueApi;
176    use crate::internal::queue::{BatchSize, MAX_RX_QUEUED_LEN};
177
178    #[derive(Default)]
179    struct FakeRxQueueState {
180        queue: ReceiveQueueState<(), Buf<Vec<u8>>>,
181        handled_frames: Vec<Buf<Vec<u8>>>,
182    }
183
184    #[derive(Default)]
185    struct FakeRxQueueBindingsCtxState {
186        woken_rx_tasks: Vec<FakeLinkDeviceId>,
187    }
188
189    type FakeCoreCtxImpl = FakeCoreCtx<FakeRxQueueState, (), FakeLinkDeviceId>;
190    type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeRxQueueBindingsCtxState, ()>;
191
192    impl ReceiveQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
193        fn wake_rx_task(&mut self, device_id: &FakeLinkDeviceId) {
194            self.state.woken_rx_tasks.push(device_id.clone())
195        }
196    }
197
198    impl ReceiveQueueTypes<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
199        type Meta = ();
200        type Buffer = Buf<Vec<u8>>;
201    }
202
203    impl ReceiveQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
204        fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<(), Buf<Vec<u8>>>) -> O>(
205            &mut self,
206            &FakeLinkDeviceId: &FakeLinkDeviceId,
207            cb: F,
208        ) -> O {
209            cb(&mut self.state.queue)
210        }
211    }
212
213    impl ReceiveDequeFrameContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
214        fn handle_frame(
215            &mut self,
216            _bindings_ctx: &mut FakeBindingsCtxImpl,
217            &FakeLinkDeviceId: &FakeLinkDeviceId,
218            (): (),
219            buf: Buf<Vec<u8>>,
220        ) {
221            self.state.handled_frames.push(buf)
222        }
223    }
224
225    impl ReceiveDequeContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
226        type ReceiveQueueCtx<'a> = Self;
227
228        fn with_dequed_frames_and_rx_queue_ctx<
229            O,
230            F: FnOnce(
231                &mut DequeueState<Self::Meta, Self::Buffer>,
232                &mut Self::ReceiveQueueCtx<'_>,
233            ) -> O,
234        >(
235            &mut self,
236            &FakeLinkDeviceId: &FakeLinkDeviceId,
237            cb: F,
238        ) -> O {
239            cb(&mut DequeueState::default(), self)
240        }
241    }
242
243    /// A trait providing a shortcut to instantiate a [`TransmitQueueApi`] from a context.
244    trait ReceiveQueueApiExt: ContextPair + Sized {
245        fn receive_queue_api<D>(&mut self) -> ReceiveQueueApi<D, &mut Self> {
246            ReceiveQueueApi::new(self)
247        }
248    }
249
250    impl<O> ReceiveQueueApiExt for O where O: ContextPair + Sized {}
251
252    #[test]
253    fn queue_and_dequeue() {
254        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
255
256        for _ in 0..2 {
257            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
258            for i in 0..MAX_RX_QUEUED_LEN {
259                let body = Buf::new(vec![i as u8], ..);
260                assert_eq!(
261                    ReceiveQueueHandler::queue_rx_frame(
262                        core_ctx,
263                        bindings_ctx,
264                        &FakeLinkDeviceId,
265                        (),
266                        body
267                    ),
268                    Ok(())
269                );
270                // We should only ever be woken up once when the first frame
271                // was enqueued.
272                assert_eq!(bindings_ctx.state.woken_rx_tasks, [FakeLinkDeviceId]);
273            }
274
275            let body = Buf::new(vec![131], ..);
276            assert_eq!(
277                ReceiveQueueHandler::queue_rx_frame(
278                    core_ctx,
279                    bindings_ctx,
280                    &FakeLinkDeviceId,
281                    (),
282                    body.clone(),
283                ),
284                Err(ReceiveQueueFullError(((), body)))
285            );
286            // We should only ever be woken up once when the first frame
287            // was enqueued.
288            assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_rx_tasks), [FakeLinkDeviceId]);
289            assert!(MAX_RX_QUEUED_LEN > BatchSize::MAX);
290            for i in (0..(MAX_RX_QUEUED_LEN - BatchSize::MAX)).step_by(BatchSize::MAX) {
291                assert_eq!(
292                    ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
293                    WorkQueueReport::Pending
294                );
295                assert_eq!(
296                    core::mem::take(&mut ctx.core_ctx.state.handled_frames),
297                    (i..i + BatchSize::MAX)
298                        .map(|i| Buf::new(vec![i as u8], ..))
299                        .collect::<Vec<_>>()
300                );
301            }
302
303            assert_eq!(
304                ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
305                WorkQueueReport::AllDone
306            );
307            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
308            assert_eq!(
309                core::mem::take(&mut core_ctx.state.handled_frames),
310                (BatchSize::MAX * (MAX_RX_QUEUED_LEN / BatchSize::MAX - 1)..MAX_RX_QUEUED_LEN)
311                    .map(|i| Buf::new(vec![i as u8], ..))
312                    .collect::<Vec<_>>()
313            );
314            // Should not have woken up the RX task since the queue should be
315            // empty.
316            assert_matches!(&core::mem::take(&mut bindings_ctx.state.woken_rx_tasks)[..], []);
317
318            // The queue should now be empty so the next iteration of queueing
319            // `MAX_RX_QUEUED_LEN` frames should succeed.
320        }
321    }
322}