netstack3_device/queue/
fifo.rsuse alloc::collections::VecDeque;
use derivative::Derivative;
use crate::internal::queue::{
DequeueResult, EnqueueResult, ReceiveQueueFullError, MAX_RX_QUEUED_LEN, MAX_TX_QUEUED_LEN,
};
#[derive(Derivative)]
#[derivative(Default(bound = ""))]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
pub(super) struct Queue<Meta, Buffer> {
items: VecDeque<(Meta, Buffer)>,
}
impl<Meta, Buffer> Queue<Meta, Buffer> {
pub(crate) fn requeue_items(&mut self, source: &mut VecDeque<(Meta, Buffer)>) {
while let Some(p) = source.pop_back() {
self.items.push_front(p);
}
}
pub(super) fn dequeue_into(
&mut self,
sink: &mut VecDeque<(Meta, Buffer)>,
max_batch_size: usize,
) -> DequeueResult {
for _ in 0..max_batch_size {
match self.items.pop_front() {
Some(p) => sink.push_back(p),
None => break,
}
}
if self.items.is_empty() {
DequeueResult::NoMoreLeft
} else {
DequeueResult::MoreStillQueued
}
}
pub(super) fn queue_rx_frame(
&mut self,
meta: Meta,
body: Buffer,
) -> Result<EnqueueResult, ReceiveQueueFullError<(Meta, Buffer)>> {
let Self { items } = self;
let len = items.len();
if len == MAX_RX_QUEUED_LEN {
return Err(ReceiveQueueFullError((meta, body)));
}
items.push_back((meta, body));
Ok(if len == 0 {
EnqueueResult::QueueWasPreviouslyEmpty
} else {
EnqueueResult::QueuePreviouslyWasOccupied
})
}
pub(crate) fn tx_inserter(&mut self) -> Option<QueueTxInserter<'_, Meta, Buffer>> {
let Self { items } = self;
let len = items.len();
(len < MAX_TX_QUEUED_LEN).then(|| QueueTxInserter { queue: self, len })
}
pub(super) fn len(&self) -> usize {
let Self { items } = self;
items.len()
}
}
pub(super) struct QueueTxInserter<'a, Meta, Buffer> {
queue: &'a mut Queue<Meta, Buffer>,
len: usize,
}
impl<'a, Meta, Buffer> QueueTxInserter<'a, Meta, Buffer> {
pub(crate) fn insert(self, meta: Meta, buffer: Buffer) -> EnqueueResult {
let Self { queue: Queue { items }, len } = self;
items.push_back((meta, buffer));
if len == 0 {
EnqueueResult::QueueWasPreviouslyEmpty
} else {
EnqueueResult::QueuePreviouslyWasOccupied
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use packet::Buf;
#[test]
fn max_elements() {
let mut fifo = Queue::default();
let mut res = Ok(EnqueueResult::QueueWasPreviouslyEmpty);
for i in 0..MAX_RX_QUEUED_LEN {
let body = Buf::new([i as u8], ..);
assert_eq!(fifo.queue_rx_frame((), body), res);
res = Ok(EnqueueResult::QueuePreviouslyWasOccupied);
}
let frames =
(0..MAX_RX_QUEUED_LEN).map(|i| ((), Buf::new([i as u8], ..))).collect::<VecDeque<_>>();
assert_eq!(fifo.items, frames);
let body = Buf::new([131], ..);
assert_eq!(fifo.queue_rx_frame((), body.clone()), Err(ReceiveQueueFullError(((), body))));
assert_eq!(fifo.items, frames);
}
}