netstack3_device/queue/
fifo.rs

1// Copyright 2022 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! FiFo device queue.
6
7use alloc::collections::VecDeque;
8
9use derivative::Derivative;
10
11use crate::internal::queue::{
12    DequeueResult, EnqueueResult, MAX_RX_QUEUED_LEN, MAX_TX_QUEUED_LEN, ReceiveQueueFullError,
13};
14
15/// A FiFo (First In, First Out) queue.
16///
17/// If the queue is full, no new entries will be accepted.
18#[derive(Derivative)]
19#[derivative(Default(bound = ""))]
20#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
21pub(super) struct Queue<Meta, Buffer> {
22    items: VecDeque<(Meta, Buffer)>,
23}
24
25impl<Meta, Buffer> Queue<Meta, Buffer> {
26    pub(crate) fn requeue_items(&mut self, source: &mut VecDeque<(Meta, Buffer)>) {
27        while let Some(p) = source.pop_back() {
28            self.items.push_front(p);
29        }
30    }
31
32    /// Dequeues items from this queue and pushes them to the back of the
33    /// sink.
34    ///
35    /// Note that this method takes an explicit `max_batch_size` argument
36    /// because the `VecDeque`'s capacity (via `VecDequeue::capacity`) may be
37    /// larger than some specified maximum batch size. Note that
38    /// [`VecDeque::with_capcity`] may allocate more capacity than specified.
39    pub(super) fn dequeue_into(
40        &mut self,
41        sink: &mut VecDeque<(Meta, Buffer)>,
42        max_batch_size: usize,
43    ) -> DequeueResult {
44        for _ in 0..max_batch_size {
45            match self.items.pop_front() {
46                Some(p) => sink.push_back(p),
47                // No more items.
48                None => break,
49            }
50        }
51
52        if self.items.is_empty() {
53            DequeueResult::NoMoreLeft
54        } else {
55            DequeueResult::MoreStillQueued
56        }
57    }
58
59    #[cfg(any(test, feature = "testutils"))]
60    pub(super) fn iter_frames(&self) -> impl Iterator<Item = &(Meta, Buffer)> {
61        self.items.iter()
62    }
63
64    /// Attempts to add the RX frame to the queue.
65    pub(super) fn queue_rx_frame(
66        &mut self,
67        meta: Meta,
68        body: Buffer,
69    ) -> Result<EnqueueResult, ReceiveQueueFullError<(Meta, Buffer)>> {
70        let Self { items } = self;
71
72        let len = items.len();
73        if len == MAX_RX_QUEUED_LEN {
74            return Err(ReceiveQueueFullError((meta, body)));
75        }
76
77        items.push_back((meta, body));
78
79        Ok(if len == 0 {
80            EnqueueResult::QueueWasPreviouslyEmpty
81        } else {
82            EnqueueResult::QueuePreviouslyWasOccupied
83        })
84    }
85
86    /// Attempts to add the tx frame to the queue.
87    ///
88    /// The returned `QueueTxInserter` can insert a single entry into the queue.
89    pub(crate) fn tx_inserter(&mut self) -> Option<QueueTxInserter<'_, Meta, Buffer>> {
90        let Self { items } = self;
91        let len = items.len();
92        (len < MAX_TX_QUEUED_LEN).then_some(QueueTxInserter { queue: self, len })
93    }
94
95    pub(super) fn len(&self) -> usize {
96        let Self { items } = self;
97        items.len()
98    }
99}
100
101/// A type witnessing that a [`Queue`] has insertion space.
102pub(super) struct QueueTxInserter<'a, Meta, Buffer> {
103    /// The queue we're inserting into.
104    queue: &'a mut Queue<Meta, Buffer>,
105    /// The length of the `queue` upon `QueueTxInserter`'s creation.
106    len: usize,
107}
108
109impl<'a, Meta, Buffer> QueueTxInserter<'a, Meta, Buffer> {
110    /// Inserts a single entry in the queue.
111    pub(crate) fn insert(self, meta: Meta, buffer: Buffer) -> EnqueueResult {
112        let Self { queue: Queue { items }, len } = self;
113        items.push_back((meta, buffer));
114        if len == 0 {
115            EnqueueResult::QueueWasPreviouslyEmpty
116        } else {
117            EnqueueResult::QueuePreviouslyWasOccupied
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125
126    use packet::Buf;
127
128    #[test]
129    fn max_elements() {
130        let mut fifo = Queue::default();
131
132        let mut res = Ok(EnqueueResult::QueueWasPreviouslyEmpty);
133        for i in 0..MAX_RX_QUEUED_LEN {
134            let body = Buf::new([i as u8], ..);
135            assert_eq!(fifo.queue_rx_frame((), body), res);
136
137            // The result we expect after the first frame is enqueued.
138            res = Ok(EnqueueResult::QueuePreviouslyWasOccupied);
139        }
140
141        let frames =
142            (0..MAX_RX_QUEUED_LEN).map(|i| ((), Buf::new([i as u8], ..))).collect::<VecDeque<_>>();
143        assert_eq!(fifo.items, frames);
144
145        let body = Buf::new([131], ..);
146        assert_eq!(fifo.queue_rx_frame((), body.clone()), Err(ReceiveQueueFullError(((), body))));
147        assert_eq!(fifo.items, frames);
148    }
149}