netstack3_device/queue/
fifo.rs1use alloc::collections::VecDeque;
8
9use derivative::Derivative;
10
11use crate::internal::queue::{
12 DequeueResult, EnqueueResult, ReceiveQueueFullError, MAX_RX_QUEUED_LEN, MAX_TX_QUEUED_LEN,
13};
14
15#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub(super) struct Queue<Meta, Buffer> {
22 items: VecDeque<(Meta, Buffer)>,
23}
24
25impl<Meta, Buffer> Queue<Meta, Buffer> {
26 pub(crate) fn requeue_items(&mut self, source: &mut VecDeque<(Meta, Buffer)>) {
27 while let Some(p) = source.pop_back() {
28 self.items.push_front(p);
29 }
30 }
31
32 pub(super) fn dequeue_into(
40 &mut self,
41 sink: &mut VecDeque<(Meta, Buffer)>,
42 max_batch_size: usize,
43 ) -> DequeueResult {
44 for _ in 0..max_batch_size {
45 match self.items.pop_front() {
46 Some(p) => sink.push_back(p),
47 None => break,
49 }
50 }
51
52 if self.items.is_empty() {
53 DequeueResult::NoMoreLeft
54 } else {
55 DequeueResult::MoreStillQueued
56 }
57 }
58
59 pub(super) fn queue_rx_frame(
61 &mut self,
62 meta: Meta,
63 body: Buffer,
64 ) -> Result<EnqueueResult, ReceiveQueueFullError<(Meta, Buffer)>> {
65 let Self { items } = self;
66
67 let len = items.len();
68 if len == MAX_RX_QUEUED_LEN {
69 return Err(ReceiveQueueFullError((meta, body)));
70 }
71
72 items.push_back((meta, body));
73
74 Ok(if len == 0 {
75 EnqueueResult::QueueWasPreviouslyEmpty
76 } else {
77 EnqueueResult::QueuePreviouslyWasOccupied
78 })
79 }
80
81 pub(crate) fn tx_inserter(&mut self) -> Option<QueueTxInserter<'_, Meta, Buffer>> {
85 let Self { items } = self;
86 let len = items.len();
87 (len < MAX_TX_QUEUED_LEN).then_some(QueueTxInserter { queue: self, len })
88 }
89
90 pub(super) fn len(&self) -> usize {
91 let Self { items } = self;
92 items.len()
93 }
94}
95
96pub(super) struct QueueTxInserter<'a, Meta, Buffer> {
98 queue: &'a mut Queue<Meta, Buffer>,
100 len: usize,
102}
103
104impl<'a, Meta, Buffer> QueueTxInserter<'a, Meta, Buffer> {
105 pub(crate) fn insert(self, meta: Meta, buffer: Buffer) -> EnqueueResult {
107 let Self { queue: Queue { items }, len } = self;
108 items.push_back((meta, buffer));
109 if len == 0 {
110 EnqueueResult::QueueWasPreviouslyEmpty
111 } else {
112 EnqueueResult::QueuePreviouslyWasOccupied
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120
121 use packet::Buf;
122
123 #[test]
124 fn max_elements() {
125 let mut fifo = Queue::default();
126
127 let mut res = Ok(EnqueueResult::QueueWasPreviouslyEmpty);
128 for i in 0..MAX_RX_QUEUED_LEN {
129 let body = Buf::new([i as u8], ..);
130 assert_eq!(fifo.queue_rx_frame((), body), res);
131
132 res = Ok(EnqueueResult::QueuePreviouslyWasOccupied);
134 }
135
136 let frames =
137 (0..MAX_RX_QUEUED_LEN).map(|i| ((), Buf::new([i as u8], ..))).collect::<VecDeque<_>>();
138 assert_eq!(fifo.items, frames);
139
140 let body = Buf::new([131], ..);
141 assert_eq!(fifo.queue_rx_frame((), body.clone()), Err(ReceiveQueueFullError(((), body))));
142 assert_eq!(fifo.items, frames);
143 }
144}