netstack3_device/queue/
fifo.rs1use alloc::collections::VecDeque;
8
9use derivative::Derivative;
10
11use crate::internal::queue::{
12 DequeueResult, EnqueueResult, MAX_RX_QUEUED_LEN, MAX_TX_QUEUED_LEN, ReceiveQueueFullError,
13};
14
15#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub(super) struct Queue<Meta, Buffer> {
22 items: VecDeque<(Meta, Buffer)>,
23}
24
25impl<Meta, Buffer> Queue<Meta, Buffer> {
26 pub(crate) fn requeue_items(&mut self, source: &mut VecDeque<(Meta, Buffer)>) {
27 while let Some(p) = source.pop_back() {
28 self.items.push_front(p);
29 }
30 }
31
32 pub(super) fn dequeue_into(
40 &mut self,
41 sink: &mut VecDeque<(Meta, Buffer)>,
42 max_batch_size: usize,
43 ) -> DequeueResult {
44 for _ in 0..max_batch_size {
45 match self.items.pop_front() {
46 Some(p) => sink.push_back(p),
47 None => break,
49 }
50 }
51
52 if self.items.is_empty() {
53 DequeueResult::NoMoreLeft
54 } else {
55 DequeueResult::MoreStillQueued
56 }
57 }
58
59 #[cfg(any(test, feature = "testutils"))]
60 pub(super) fn iter_frames(&self) -> impl Iterator<Item = &(Meta, Buffer)> {
61 self.items.iter()
62 }
63
64 pub(super) fn queue_rx_frame(
66 &mut self,
67 meta: Meta,
68 body: Buffer,
69 ) -> Result<EnqueueResult, ReceiveQueueFullError<(Meta, Buffer)>> {
70 let Self { items } = self;
71
72 let len = items.len();
73 if len == MAX_RX_QUEUED_LEN {
74 return Err(ReceiveQueueFullError((meta, body)));
75 }
76
77 items.push_back((meta, body));
78
79 Ok(if len == 0 {
80 EnqueueResult::QueueWasPreviouslyEmpty
81 } else {
82 EnqueueResult::QueuePreviouslyWasOccupied
83 })
84 }
85
86 pub(crate) fn tx_inserter(&mut self) -> Option<QueueTxInserter<'_, Meta, Buffer>> {
90 let Self { items } = self;
91 let len = items.len();
92 (len < MAX_TX_QUEUED_LEN).then_some(QueueTxInserter { queue: self, len })
93 }
94
95 pub(super) fn len(&self) -> usize {
96 let Self { items } = self;
97 items.len()
98 }
99}
100
101pub(super) struct QueueTxInserter<'a, Meta, Buffer> {
103 queue: &'a mut Queue<Meta, Buffer>,
105 len: usize,
107}
108
109impl<'a, Meta, Buffer> QueueTxInserter<'a, Meta, Buffer> {
110 pub(crate) fn insert(self, meta: Meta, buffer: Buffer) -> EnqueueResult {
112 let Self { queue: Queue { items }, len } = self;
113 items.push_back((meta, buffer));
114 if len == 0 {
115 EnqueueResult::QueueWasPreviouslyEmpty
116 } else {
117 EnqueueResult::QueuePreviouslyWasOccupied
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125
126 use packet::Buf;
127
128 #[test]
129 fn max_elements() {
130 let mut fifo = Queue::default();
131
132 let mut res = Ok(EnqueueResult::QueueWasPreviouslyEmpty);
133 for i in 0..MAX_RX_QUEUED_LEN {
134 let body = Buf::new([i as u8], ..);
135 assert_eq!(fifo.queue_rx_frame((), body), res);
136
137 res = Ok(EnqueueResult::QueuePreviouslyWasOccupied);
139 }
140
141 let frames =
142 (0..MAX_RX_QUEUED_LEN).map(|i| ((), Buf::new([i as u8], ..))).collect::<VecDeque<_>>();
143 assert_eq!(fifo.items, frames);
144
145 let body = Buf::new([131], ..);
146 assert_eq!(fifo.queue_rx_frame((), body.clone()), Err(ReceiveQueueFullError(((), body))));
147 assert_eq!(fifo.items, frames);
148 }
149}