1use core::fmt::Debug;
8
9use derivative::Derivative;
10use netstack3_base::sync::Mutex;
11use netstack3_base::{Device, DeviceIdContext};
12use packet::BufferMut;
13
14use crate::internal::queue::{DequeueState, EnqueueResult, ReceiveQueueFullError, fifo};
15
16#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub struct ReceiveQueueState<Meta, Buffer> {
22 pub(super) queue: fifo::Queue<Meta, Buffer>,
23}
24
25#[cfg(any(test, feature = "testutils"))]
26impl<Meta, Buffer> ReceiveQueueState<Meta, Buffer> {
27 pub fn take_frames(&mut self) -> impl Iterator<Item = (Meta, Buffer)> + use<Meta, Buffer> {
29 let Self { queue } = self;
30 let mut vec = Default::default();
31 assert_matches::assert_matches!(
32 queue.dequeue_into(&mut vec, usize::MAX),
33 crate::internal::queue::DequeueResult::NoMoreLeft
34 );
35 vec.into_iter()
36 }
37
38 pub fn iter_frames(&self) -> impl Iterator<Item = &(Meta, Buffer)> {
40 self.queue.iter_frames()
41 }
42}
43
44pub trait ReceiveQueueBindingsContext<DeviceId> {
46 fn wake_rx_task(&mut self, device_id: &DeviceId);
53}
54
55#[derive(Derivative)]
57#[derivative(Default(bound = ""))]
58pub struct ReceiveQueue<Meta, Buffer> {
59 pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
63 pub(crate) queue: Mutex<ReceiveQueueState<Meta, Buffer>>,
68}
69
70pub trait ReceiveQueueTypes<D: Device, BC>: DeviceIdContext<D> {
72 type Meta;
74
75 type Buffer: BufferMut + Debug;
77}
78
79pub trait ReceiveQueueContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
81 fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<Self::Meta, Self::Buffer>) -> O>(
83 &mut self,
84 device_id: &Self::DeviceId,
85 cb: F,
86 ) -> O;
87}
88
89pub trait ReceiveDequeFrameContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
90 fn handle_frame(
92 &mut self,
93 bindings_ctx: &mut BC,
94 device_id: &Self::DeviceId,
95 meta: Self::Meta,
96 buf: Self::Buffer,
97 );
98}
99
100pub trait ReceiveDequeContext<D: Device, BC>: ReceiveQueueTypes<D, BC> {
102 type ReceiveQueueCtx<'a>: ReceiveQueueContext<
104 D,
105 BC,
106 Meta = Self::Meta,
107 Buffer = Self::Buffer,
108 DeviceId = Self::DeviceId,
109 > + ReceiveDequeFrameContext<
110 D,
111 BC,
112 Meta = Self::Meta,
113 Buffer = Self::Buffer,
114 DeviceId = Self::DeviceId,
115 >;
116
117 fn with_dequed_frames_and_rx_queue_ctx<
119 O,
120 F: FnOnce(&mut DequeueState<Self::Meta, Self::Buffer>, &mut Self::ReceiveQueueCtx<'_>) -> O,
121 >(
122 &mut self,
123 device_id: &Self::DeviceId,
124 cb: F,
125 ) -> O;
126}
127
128pub trait ReceiveQueueHandler<D: Device, BC>: ReceiveQueueTypes<D, BC> {
130 fn queue_rx_frame(
136 &mut self,
137 bindings_ctx: &mut BC,
138 device_id: &Self::DeviceId,
139 meta: Self::Meta,
140 body: Self::Buffer,
141 ) -> Result<(), ReceiveQueueFullError<(Self::Meta, Self::Buffer)>>;
142}
143
144impl<D: Device, BC: ReceiveQueueBindingsContext<CC::DeviceId>, CC: ReceiveQueueContext<D, BC>>
145 ReceiveQueueHandler<D, BC> for CC
146{
147 fn queue_rx_frame(
148 &mut self,
149 bindings_ctx: &mut BC,
150 device_id: &CC::DeviceId,
151 meta: CC::Meta,
152 body: CC::Buffer,
153 ) -> Result<(), ReceiveQueueFullError<(Self::Meta, CC::Buffer)>> {
154 self.with_receive_queue_mut(device_id, |ReceiveQueueState { queue }| {
155 queue.queue_rx_frame(meta, body).map(|res| match res {
156 EnqueueResult::QueueWasPreviouslyEmpty => bindings_ctx.wake_rx_task(device_id),
157 EnqueueResult::QueuePreviouslyWasOccupied => {
158 }
161 })
162 })
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 use alloc::vec;
171 use alloc::vec::Vec;
172 use assert_matches::assert_matches;
173
174 use netstack3_base::testutil::{
175 FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
176 };
177 use netstack3_base::{ContextPair, CtxPair, WorkQueueReport};
178 use packet::Buf;
179
180 use crate::internal::queue::api::ReceiveQueueApi;
181 use crate::internal::queue::{BatchSize, MAX_RX_QUEUED_LEN};
182
183 #[derive(Default)]
184 struct FakeRxQueueState {
185 queue: ReceiveQueueState<(), Buf<Vec<u8>>>,
186 handled_frames: Vec<Buf<Vec<u8>>>,
187 }
188
189 #[derive(Default)]
190 struct FakeRxQueueBindingsCtxState {
191 woken_rx_tasks: Vec<FakeLinkDeviceId>,
192 }
193
194 type FakeCoreCtxImpl = FakeCoreCtx<FakeRxQueueState, (), FakeLinkDeviceId>;
195 type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeRxQueueBindingsCtxState, ()>;
196
197 impl ReceiveQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
198 fn wake_rx_task(&mut self, device_id: &FakeLinkDeviceId) {
199 self.state.woken_rx_tasks.push(device_id.clone())
200 }
201 }
202
203 impl ReceiveQueueTypes<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
204 type Meta = ();
205 type Buffer = Buf<Vec<u8>>;
206 }
207
208 impl ReceiveQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
209 fn with_receive_queue_mut<O, F: FnOnce(&mut ReceiveQueueState<(), Buf<Vec<u8>>>) -> O>(
210 &mut self,
211 &FakeLinkDeviceId: &FakeLinkDeviceId,
212 cb: F,
213 ) -> O {
214 cb(&mut self.state.queue)
215 }
216 }
217
218 impl ReceiveDequeFrameContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
219 fn handle_frame(
220 &mut self,
221 _bindings_ctx: &mut FakeBindingsCtxImpl,
222 &FakeLinkDeviceId: &FakeLinkDeviceId,
223 (): (),
224 buf: Buf<Vec<u8>>,
225 ) {
226 self.state.handled_frames.push(buf)
227 }
228 }
229
230 impl ReceiveDequeContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
231 type ReceiveQueueCtx<'a> = Self;
232
233 fn with_dequed_frames_and_rx_queue_ctx<
234 O,
235 F: FnOnce(
236 &mut DequeueState<Self::Meta, Self::Buffer>,
237 &mut Self::ReceiveQueueCtx<'_>,
238 ) -> O,
239 >(
240 &mut self,
241 &FakeLinkDeviceId: &FakeLinkDeviceId,
242 cb: F,
243 ) -> O {
244 cb(&mut DequeueState::default(), self)
245 }
246 }
247
248 trait ReceiveQueueApiExt: ContextPair + Sized {
250 fn receive_queue_api<D>(&mut self) -> ReceiveQueueApi<D, &mut Self> {
251 ReceiveQueueApi::new(self)
252 }
253 }
254
255 impl<O> ReceiveQueueApiExt for O where O: ContextPair + Sized {}
256
257 #[test]
258 fn queue_and_dequeue() {
259 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
260
261 for _ in 0..2 {
262 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
263 for i in 0..MAX_RX_QUEUED_LEN {
264 let body = Buf::new(vec![i as u8], ..);
265 assert_eq!(
266 ReceiveQueueHandler::queue_rx_frame(
267 core_ctx,
268 bindings_ctx,
269 &FakeLinkDeviceId,
270 (),
271 body
272 ),
273 Ok(())
274 );
275 assert_eq!(bindings_ctx.state.woken_rx_tasks, [FakeLinkDeviceId]);
278 }
279
280 let body = Buf::new(vec![131], ..);
281 assert_eq!(
282 ReceiveQueueHandler::queue_rx_frame(
283 core_ctx,
284 bindings_ctx,
285 &FakeLinkDeviceId,
286 (),
287 body.clone(),
288 ),
289 Err(ReceiveQueueFullError(((), body)))
290 );
291 assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_rx_tasks), [FakeLinkDeviceId]);
294 assert!(MAX_RX_QUEUED_LEN > BatchSize::MAX);
295 for i in (0..(MAX_RX_QUEUED_LEN - BatchSize::MAX)).step_by(BatchSize::MAX) {
296 assert_eq!(
297 ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
298 WorkQueueReport::Pending
299 );
300 assert_eq!(
301 core::mem::take(&mut ctx.core_ctx.state.handled_frames),
302 (i..i + BatchSize::MAX)
303 .map(|i| Buf::new(vec![i as u8], ..))
304 .collect::<Vec<_>>()
305 );
306 }
307
308 assert_eq!(
309 ctx.receive_queue_api().handle_queued_frames(&FakeLinkDeviceId),
310 WorkQueueReport::AllDone
311 );
312 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
313 assert_eq!(
314 core::mem::take(&mut core_ctx.state.handled_frames),
315 (BatchSize::MAX * (MAX_RX_QUEUED_LEN / BatchSize::MAX - 1)..MAX_RX_QUEUED_LEN)
316 .map(|i| Buf::new(vec![i as u8], ..))
317 .collect::<Vec<_>>()
318 );
319 assert_matches!(&core::mem::take(&mut bindings_ctx.state.woken_rx_tasks)[..], []);
322
323 }
326 }
327}