netstack3_device/queue/
fifo.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FiFo device queue.
6
7use alloc::collections::VecDeque;
8
9use derivative::Derivative;
10
11use crate::internal::queue::{
12    DequeueResult, EnqueueResult, ReceiveQueueFullError, MAX_RX_QUEUED_LEN, MAX_TX_QUEUED_LEN,
13};
14
15/// A FiFo (First In, First Out) queue.
16///
17/// If the queue is full, no new entries will be accepted.
18#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub(super) struct Queue<Meta, Buffer> {
22    items: VecDeque<(Meta, Buffer)>,
23}
24
25impl<Meta, Buffer> Queue<Meta, Buffer> {
26    pub(crate) fn requeue_items(&mut self, source: &mut VecDeque<(Meta, Buffer)>) {
27        while let Some(p) = source.pop_back() {
28            self.items.push_front(p);
29        }
30    }
31
32    /// Dequeues items from this queue and pushes them to the back of the
33    /// sink.
34    ///
35    /// Note that this method takes an explicit `max_batch_size` argument
36    /// because the `VecDeque`'s capacity (via `VecDequeue::capacity`) may be
37    /// larger than some specified maximum batch size. Note that
38    /// [`VecDeque::with_capcity`] may allocate more capacity than specified.
39    pub(super) fn dequeue_into(
40        &mut self,
41        sink: &mut VecDeque<(Meta, Buffer)>,
42        max_batch_size: usize,
43    ) -> DequeueResult {
44        for _ in 0..max_batch_size {
45            match self.items.pop_front() {
46                Some(p) => sink.push_back(p),
47                // No more items.
48                None => break,
49            }
50        }
51
52        if self.items.is_empty() {
53            DequeueResult::NoMoreLeft
54        } else {
55            DequeueResult::MoreStillQueued
56        }
57    }
58
59    /// Attempts to add the RX frame to the queue.
60    pub(super) fn queue_rx_frame(
61        &mut self,
62        meta: Meta,
63        body: Buffer,
64    ) -> Result<EnqueueResult, ReceiveQueueFullError<(Meta, Buffer)>> {
65        let Self { items } = self;
66
67        let len = items.len();
68        if len == MAX_RX_QUEUED_LEN {
69            return Err(ReceiveQueueFullError((meta, body)));
70        }
71
72        items.push_back((meta, body));
73
74        Ok(if len == 0 {
75            EnqueueResult::QueueWasPreviouslyEmpty
76        } else {
77            EnqueueResult::QueuePreviouslyWasOccupied
78        })
79    }
80
81    /// Attempts to add the tx frame to the queue.
82    ///
83    /// The returned `QueueTxInserter` can insert a single entry into the queue.
84    pub(crate) fn tx_inserter(&mut self) -> Option<QueueTxInserter<'_, Meta, Buffer>> {
85        let Self { items } = self;
86        let len = items.len();
87        (len < MAX_TX_QUEUED_LEN).then_some(QueueTxInserter { queue: self, len })
88    }
89
90    pub(super) fn len(&self) -> usize {
91        let Self { items } = self;
92        items.len()
93    }
94}
95
96/// A type witnessing that a [`Queue`] has insertion space.
97pub(super) struct QueueTxInserter<'a, Meta, Buffer> {
98    /// The queue we're inserting into.
99    queue: &'a mut Queue<Meta, Buffer>,
100    /// The length of the `queue` upon `QueueTxInserter`'s creation.
101    len: usize,
102}
103
104impl<'a, Meta, Buffer> QueueTxInserter<'a, Meta, Buffer> {
105    /// Inserts a single entry in the queue.
106    pub(crate) fn insert(self, meta: Meta, buffer: Buffer) -> EnqueueResult {
107        let Self { queue: Queue { items }, len } = self;
108        items.push_back((meta, buffer));
109        if len == 0 {
110            EnqueueResult::QueueWasPreviouslyEmpty
111        } else {
112            EnqueueResult::QueuePreviouslyWasOccupied
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use super::*;
120
121    use packet::Buf;
122
123    #[test]
124    fn max_elements() {
125        let mut fifo = Queue::default();
126
127        let mut res = Ok(EnqueueResult::QueueWasPreviouslyEmpty);
128        for i in 0..MAX_RX_QUEUED_LEN {
129            let body = Buf::new([i as u8], ..);
130            assert_eq!(fifo.queue_rx_frame((), body), res);
131
132            // The result we expect after the first frame is enqueued.
133            res = Ok(EnqueueResult::QueuePreviouslyWasOccupied);
134        }
135
136        let frames =
137            (0..MAX_RX_QUEUED_LEN).map(|i| ((), Buf::new([i as u8], ..))).collect::<VecDeque<_>>();
138        assert_eq!(fifo.items, frames);
139
140        let body = Buf::new([131], ..);
141        assert_eq!(fifo.queue_rx_frame((), body.clone()), Err(ReceiveQueueFullError(((), body))));
142        assert_eq!(fifo.items, frames);
143    }
144}