1use core::fmt::Debug;
8
9use derivative::Derivative;
10use netstack3_base::sync::Mutex;
11use netstack3_base::{Device, DeviceIdContext};
12use packet::BufferMut;
13
14use crate::internal::queue::{fifo, DequeueState, EnqueueResult, ReceiveQueueFullError};
15
16#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub struct ReceiveQueueState<Meta, Buffer> {
22 pub(super) queue: fifo::Queue<Meta, Buffer>,
23}
24
25#[cfg(any(test, feature = "testutils"))]
26impl<Meta, Buffer> ReceiveQueueState<Meta, Buffer> {
27 pub fn take_frames(&mut self) -> impl Iterator<Item = (Meta, Buffer)> {
29 let Self { queue } = self;
30 let mut vec = Default::default();
31 assert_matches::assert_matches!(
32 queue.dequeue_into(&mut vec, usize::MAX),
33 crate::internal::queue::DequeueResult::NoMoreLeft
34 );
35 vec.into_iter()
36 }
37}
38
39pub trait ReceiveQueueBindingsContext<DeviceId> {
41 fn wake_rx_task(&mut self, device_id: &DeviceId);
48}
49
50#[derive(Derivative)]
52#[derivative(Default(bound = ""))]
53pub struct ReceiveQueue<Meta, Buffer> {
54 pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
58 pub(crate) queue: Mutex<ReceiveQueueState<Meta, Buffer>>,
63}
64
65pub trait ReceiveQueueTypes<D: Device, BC>: DeviceIdContext<D> {
67 type Meta;
69
70 type Buffer: BufferMut + Debug;
72}
73
74pub trait ReceiveQueueContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
76 fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<Self::Meta, Self::Buffer>) -> O>(
78 &mut self,
79 device_id: &Self::DeviceId,
80 cb: F,
81 ) -> O;
82}
83
84pub trait ReceiveDequeFrameContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
85 fn handle_frame(
87 &mut self,
88 bindings_ctx: &mut BC,
89 device_id: &Self::DeviceId,
90 meta: Self::Meta,
91 buf: Self::Buffer,
92 );
93}
94
95pub trait ReceiveDequeContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
97 type ReceiveQueueCtx<'a>: ReceiveQueueContext<
99 D,
100 BC,
101 Meta = Self::Meta,
102 Buffer = Self::Buffer,
103 DeviceId = Self::DeviceId,
104 > + ReceiveDequeFrameContext<
105 D,
106 BC,
107 Meta = Self::Meta,
108 Buffer = Self::Buffer,
109 DeviceId = Self::DeviceId,
110 >;
111
112 fn with_dequed_frames_and_rx_queue_ctx<
114 O,
115 F: FnOnce(&mut DequeueState<Self::Meta, Self::Buffer>, &mut Self::ReceiveQueueCtx<'_>) -> O,
116 >(
117 &mut self,
118 device_id: &Self::DeviceId,
119 cb: F,
120 ) -> O;
121}
122
123pub trait ReceiveQueueHandler<D: Device, BC>: ReceiveQueueTypes<D, BC> {
125 fn queue_rx_frame(
131 &mut self,
132 bindings_ctx: &mut BC,
133 device_id: &Self::DeviceId,
134 meta: Self::Meta,
135 body: Self::Buffer,
136 ) -> Result<(), ReceiveQueueFullError<(Self::Meta, Self::Buffer)>>;
137}
138
139impl<D: Device, BC: ReceiveQueueBindingsContext<CC::DeviceId>, CC: ReceiveQueueContext<D, BC>>
140 ReceiveQueueHandler<D, BC> for CC
141{
142 fn queue_rx_frame(
143 &mut self,
144 bindings_ctx: &mut BC,
145 device_id: &CC::DeviceId,
146 meta: CC::Meta,
147 body: CC::Buffer,
148 ) -> Result<(), ReceiveQueueFullError<(Self::Meta, CC::Buffer)>> {
149 self.with_receive_queue_mut(device_id, |ReceiveQueueState { queue }| {
150 queue.queue_rx_frame(meta, body).map(|res| match res {
151 EnqueueResult::QueueWasPreviouslyEmpty => bindings_ctx.wake_rx_task(device_id),
152 EnqueueResult::QueuePreviouslyWasOccupied => {
153 }
156 })
157 })
158 }
159}
160
161#[cfg(test)]
162mod tests {
163 use super::*;
164
165 use alloc::vec;
166 use alloc::vec::Vec;
167 use assert_matches::assert_matches;
168
169 use netstack3_base::testutil::{
170 FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
171 };
172 use netstack3_base::{ContextPair, CtxPair, WorkQueueReport};
173 use packet::Buf;
174
175 use crate::internal::queue::api::ReceiveQueueApi;
176 use crate::internal::queue::{BatchSize, MAX_RX_QUEUED_LEN};
177
178 #[derive(Default)]
179 struct FakeRxQueueState {
180 queue: ReceiveQueueState<(), Buf<Vec<u8>>>,
181 handled_frames: Vec<Buf<Vec<u8>>>,
182 }
183
184 #[derive(Default)]
185 struct FakeRxQueueBindingsCtxState {
186 woken_rx_tasks: Vec<FakeLinkDeviceId>,
187 }
188
189 type FakeCoreCtxImpl = FakeCoreCtx<FakeRxQueueState, (), FakeLinkDeviceId>;
190 type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeRxQueueBindingsCtxState, ()>;
191
192 impl ReceiveQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
193 fn wake_rx_task(&mut self, device_id: &FakeLinkDeviceId) {
194 self.state.woken_rx_tasks.push(device_id.clone())
195 }
196 }
197
198 impl ReceiveQueueTypes<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
199 type Meta = ();
200 type Buffer = Buf<Vec<u8>>;
201 }
202
203 impl ReceiveQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
204 fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<(), Buf<Vec<u8>>>) -> O>(
205 &mut self,
206 &FakeLinkDeviceId: &FakeLinkDeviceId,
207 cb: F,
208 ) -> O {
209 cb(&mut self.state.queue)
210 }
211 }
212
213 impl ReceiveDequeFrameContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
214 fn handle_frame(
215 &mut self,
216 _bindings_ctx: &mut FakeBindingsCtxImpl,
217 &FakeLinkDeviceId: &FakeLinkDeviceId,
218 (): (),
219 buf: Buf<Vec<u8>>,
220 ) {
221 self.state.handled_frames.push(buf)
222 }
223 }
224
225 impl ReceiveDequeContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
226 type ReceiveQueueCtx<'a> = Self;
227
228 fn with_dequed_frames_and_rx_queue_ctx<
229 O,
230 F: FnOnce(
231 &mut DequeueState<Self::Meta, Self::Buffer>,
232 &mut Self::ReceiveQueueCtx<'_>,
233 ) -> O,
234 >(
235 &mut self,
236 &FakeLinkDeviceId: &FakeLinkDeviceId,
237 cb: F,
238 ) -> O {
239 cb(&mut DequeueState::default(), self)
240 }
241 }
242
243 trait ReceiveQueueApiExt: ContextPair + Sized {
245 fn receive_queue_api<D>(&mut self) -> ReceiveQueueApi<D, &mut Self> {
246 ReceiveQueueApi::new(self)
247 }
248 }
249
250 impl<O> ReceiveQueueApiExt for O where O: ContextPair + Sized {}
251
252 #[test]
253 fn queue_and_dequeue() {
254 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
255
256 for _ in 0..2 {
257 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
258 for i in 0..MAX_RX_QUEUED_LEN {
259 let body = Buf::new(vec![i as u8], ..);
260 assert_eq!(
261 ReceiveQueueHandler::queue_rx_frame(
262 core_ctx,
263 bindings_ctx,
264 &FakeLinkDeviceId,
265 (),
266 body
267 ),
268 Ok(())
269 );
270 assert_eq!(bindings_ctx.state.woken_rx_tasks, [FakeLinkDeviceId]);
273 }
274
275 let body = Buf::new(vec![131], ..);
276 assert_eq!(
277 ReceiveQueueHandler::queue_rx_frame(
278 core_ctx,
279 bindings_ctx,
280 &FakeLinkDeviceId,
281 (),
282 body.clone(),
283 ),
284 Err(ReceiveQueueFullError(((), body)))
285 );
286 assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_rx_tasks), [FakeLinkDeviceId]);
289 assert!(MAX_RX_QUEUED_LEN > BatchSize::MAX);
290 for i in (0..(MAX_RX_QUEUED_LEN - BatchSize::MAX)).step_by(BatchSize::MAX) {
291 assert_eq!(
292 ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
293 WorkQueueReport::Pending
294 );
295 assert_eq!(
296 core::mem::take(&mut ctx.core_ctx.state.handled_frames),
297 (i..i + BatchSize::MAX)
298 .map(|i| Buf::new(vec![i as u8], ..))
299 .collect::<Vec<_>>()
300 );
301 }
302
303 assert_eq!(
304 ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
305 WorkQueueReport::AllDone
306 );
307 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
308 assert_eq!(
309 core::mem::take(&mut core_ctx.state.handled_frames),
310 (BatchSize::MAX * (MAX_RX_QUEUED_LEN / BatchSize::MAX - 1)..MAX_RX_QUEUED_LEN)
311 .map(|i| Buf::new(vec![i as u8], ..))
312 .collect::<Vec<_>>()
313 );
314 assert_matches!(&core::mem::take(&mut bindings_ctx.state.woken_rx_tasks)[..], []);
317
318 }
321 }
322}