Skip to main content

netstack3_device/queue/
tx.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! TX device queues.
6
7use core::convert::Infallible as Never;
8
9use alloc::vec::Vec;
10
11use derivative::Derivative;
12use log::trace;
13use netstack3_base::sync::Mutex;
14use netstack3_base::{
15    ChecksumOffloadResult, ChecksumOffloadSpec, Device, DeviceIdContext, ErrorAndSerializer,
16    NetworkSerializationContext, NetworkSerializer, TxMetadata,
17};
18use packet::{Buf, FragmentedBuffer as _, NoReuseBufferProvider, ReusableBuffer, new_buf_vec};
19
20use crate::internal::base::{DeviceBufferBindingsTypes, DeviceSendFrameError};
21use crate::internal::queue::{
22    DequeueState, DeviceBufferSpec, EnqueueResult, TransmitQueueFrameError, fifo,
23};
24use crate::internal::socket::{DeviceSocketHandler, ParseSentFrameError, SentFrame};
25
26/// State associated with a device transmit queue.
27#[derive(Derivative)]
28#[derivative(Default(bound = "Allocator: Default"))]
29pub struct TransmitQueueState<Meta, Buffer, Allocator> {
30    pub(super) allocator: ByteSliceAllocator<Allocator, Buffer>,
31    pub(super) queue: Option<fifo::Queue<Meta, Buffer>>,
32    pub(super) checksum_offload_spec: ChecksumOffloadSpec,
33}
34
35/// Holds queue and dequeue state for the transmit queue.
36pub struct TransmitQueue<Meta, Buffer, Allocator> {
37    /// The state for dequeued packets that will be handled.
38    ///
39    /// See `queue` for lock ordering.
40    pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
41    /// A queue of to-be-transmitted packets protected by a lock.
42    ///
43    /// Lock ordering: `deque` must be locked before `queue` is locked when both
44    /// are needed at the same time.
45    pub(crate) queue: Mutex<TransmitQueueState<Meta, Buffer, Allocator>>,
46}
47
48impl<Meta, Buffer, Allocator> TransmitQueue<Meta, Buffer, Allocator> {
49    pub(crate) fn new(allocator: Allocator, checksum_offload_spec: ChecksumOffloadSpec) -> Self {
50        Self {
51            deque: Mutex::new(DequeueState::default()),
52            queue: Mutex::new(TransmitQueueState {
53                allocator: ByteSliceAllocator::new(allocator),
54                queue: None,
55                checksum_offload_spec,
56            }),
57        }
58    }
59}
60
61/// The bindings context for the transmit queue.
62pub trait TransmitQueueBindingsContext<DeviceId>: DeviceBufferBindingsTypes {
63    /// Signals to bindings that TX frames are available and ready to be sent
64    /// over the device.
65    ///
66    /// Implementations must make sure that the API call to handle queued
67    /// packets is scheduled to be called as soon as possible so that enqueued
68    /// TX frames are promptly handled.
69    fn wake_tx_task(&mut self, device_id: &DeviceId);
70}
71
72/// A trait for metadata that is attached to a frame in the queue.
73pub trait TxQueuePacketMetadataCommon {
74    /// Sets the checksum offload result.
75    fn set_checksum_offload_result(&mut self, result: Option<ChecksumOffloadResult>);
76}
77
78impl<T: TxMetadata> TxQueuePacketMetadataCommon for T {
79    fn set_checksum_offload_result(&mut self, result: Option<ChecksumOffloadResult>) {
80        TxMetadata::set_checksum_offload_result(self, result);
81    }
82}
83
84/// Basic definitions for a transmit queue.
85pub trait TransmitQueueCommon<D: Device, C>: DeviceIdContext<D> {
86    /// The metadata associated with every packet in the queue.
87    type Meta: TxQueuePacketMetadataCommon;
88
89    /// The context given to `send_frame` when dequeueing.
90    type DequeueContext;
91
92    /// Parses an outgoing frame for packet socket delivery.
93    fn parse_outgoing_frame<'a, 'b>(
94        buf: &'a [u8],
95        meta: &'a Self::Meta,
96    ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError>;
97}
98
99/// The execution context for a transmit queue.
100pub trait TransmitQueueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueCommon<D, BC> {
101    /// Calls `cb` with mutable access to the queue state.
102    fn with_transmit_queue_mut<
103        O,
104        F: FnOnce(&mut TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
105    >(
106        &mut self,
107        device_id: &Self::DeviceId,
108        cb: F,
109    ) -> O;
110
111    /// Calls `cb` with immutable access to the queue state.
112    fn with_transmit_queue<
113        O,
114        F: FnOnce(&TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
115    >(
116        &mut self,
117        device_id: &Self::DeviceId,
118        cb: F,
119    ) -> O;
120
121    /// Send a frame out the device.
122    ///
123    /// This method may not block - if the device is not ready, an appropriate
124    /// error must be returned.
125    fn send_frame(
126        &mut self,
127        bindings_ctx: &mut BC,
128        device_id: &Self::DeviceId,
129        dequeue_context: Option<&mut Self::DequeueContext>,
130        meta: Self::Meta,
131        buf: D::TxBuffer,
132    ) -> Result<(), DeviceSendFrameError>;
133}
134
135/// The core execution context for dequeueing TX frames from the transmit queue.
136pub trait TransmitDequeueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueContext<D, BC> {
137    /// The inner context providing dequeuing.
138    type TransmitQueueCtx<'a>: TransmitQueueContext<
139            D,
140            BC,
141            Meta = Self::Meta,
142            DequeueContext = Self::DequeueContext,
143            DeviceId = Self::DeviceId,
144        > + DeviceSocketHandler<D, BC>;
145
146    /// Calls the function with the TX deque state and the TX queue context.
147    fn with_dequed_packets_and_tx_queue_ctx<
148        O,
149        F: FnOnce(&mut DequeueState<Self::Meta, D::TxBuffer>, &mut Self::TransmitQueueCtx<'_>) -> O,
150    >(
151        &mut self,
152        device_id: &Self::DeviceId,
153        cb: F,
154    ) -> O;
155}
156
157/// The configuration for a transmit queue.
158pub enum TransmitQueueConfiguration {
159    /// No queue.
160    None,
161    /// FiFo queue.
162    Fifo,
163}
164
165/// An implementation of a transmit queue that stores egress frames.
166pub trait TransmitQueueHandler<D: Device, BC>: TransmitQueueCommon<D, BC> {
167    /// Queues a frame for transmission.
168    fn queue_tx_frame<S>(
169        &mut self,
170        bindings_ctx: &mut BC,
171        device_id: &Self::DeviceId,
172        meta: Self::Meta,
173        body: S,
174    ) -> Result<usize, TransmitQueueFrameError<S>>
175    where
176        S: NetworkSerializer,
177        S::Buffer: ReusableBuffer;
178}
179
180pub(super) fn deliver_to_device_sockets<
181    D: DeviceBufferSpec<BC>,
182    BC: TransmitQueueBindingsContext<CC::DeviceId>,
183    CC: TransmitQueueCommon<D, BC> + DeviceSocketHandler<D, BC>,
184>(
185    core_ctx: &mut CC,
186    bindings_ctx: &mut BC,
187    device_id: &CC::DeviceId,
188    buffer: &D::TxBuffer,
189    meta: &CC::Meta,
190) {
191    buffer.with_bytes(|b| {
192        let mut handle_parsed = |bytes: &[u8]| match CC::parse_outgoing_frame(bytes, meta) {
193            Ok(sent_frame) => DeviceSocketHandler::handle_frame(
194                core_ctx,
195                bindings_ctx,
196                device_id,
197                sent_frame.into(),
198                bytes,
199            ),
200            Err(ParseSentFrameError) => {
201                trace!("failed to parse outgoing frame on {:?} ({} bytes)", device_id, bytes.len())
202            }
203        };
204
205        if let Some(bytes) = b.try_get_contiguous() {
206            handle_parsed(bytes)
207        } else {
208            handle_parsed(&b.to_flattened_vec())
209        }
210    })
211}
212
213impl EnqueueResult {
214    fn maybe_wake_tx<D, BC: TransmitQueueBindingsContext<D>>(
215        self,
216        bindings_ctx: &mut BC,
217        device_id: &D,
218    ) {
219        match self {
220            Self::QueuePreviouslyWasOccupied => (),
221            Self::QueueWasPreviouslyEmpty => bindings_ctx.wake_tx_task(device_id),
222        }
223    }
224}
225
226enum EnqueueStatus<Meta, Buffer> {
227    NotAttempted(Meta, Buffer),
228    Attempted,
229}
230
231// Extracted to a function without the generic serializer parameter to ease code
232// generation.
233fn insert_and_notify<
234    D: DeviceBufferSpec<BC>,
235    BC: TransmitQueueBindingsContext<CC::DeviceId>,
236    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
237>(
238    bindings_ctx: &mut BC,
239    device_id: &CC::DeviceId,
240    inserter: Option<fifo::QueueTxInserter<'_, CC::Meta, D::TxBuffer>>,
241    meta: CC::Meta,
242    body: D::TxBuffer,
243) -> EnqueueStatus<CC::Meta, D::TxBuffer> {
244    match inserter {
245        // No TX queue so send the frame immediately.
246        None => EnqueueStatus::NotAttempted(meta, body),
247        Some(inserter) => {
248            inserter.insert(meta, body).maybe_wake_tx(bindings_ctx, device_id);
249            EnqueueStatus::Attempted
250        }
251    }
252}
253
254// Extracted to a function without the generic serializer parameter to ease code
255// generation.
256fn handle_post_enqueue<
257    D: DeviceBufferSpec<BC>,
258    BC: TransmitQueueBindingsContext<CC::DeviceId>,
259    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
260>(
261    core_ctx: &mut CC,
262    bindings_ctx: &mut BC,
263    device_id: &CC::DeviceId,
264    status: EnqueueStatus<CC::Meta, D::TxBuffer>,
265) -> Result<(), DeviceSendFrameError> {
266    match status {
267        EnqueueStatus::NotAttempted(meta, body) => {
268            // TODO(https://fxbug.dev/42077654): Deliver the frame to packet
269            // sockets and to the device atomically.
270            deliver_to_device_sockets(core_ctx, bindings_ctx, device_id, &body, &meta);
271            // Send the frame while not holding the TX queue exclusively to
272            // not block concurrent senders from making progress.
273            core_ctx.send_frame(bindings_ctx, device_id, None, meta, body)
274        }
275        EnqueueStatus::Attempted => Ok(()),
276    }
277}
278
279impl<
280    D: DeviceBufferSpec<BC>,
281    BC: TransmitQueueBindingsContext<CC::DeviceId>,
282    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
283> TransmitQueueHandler<D, BC> for CC
284{
285    fn queue_tx_frame<S>(
286        &mut self,
287        bindings_ctx: &mut BC,
288        device_id: &CC::DeviceId,
289        mut meta: CC::Meta,
290        body: S,
291    ) -> Result<usize, TransmitQueueFrameError<S>>
292    where
293        S: NetworkSerializer,
294        S::Buffer: ReusableBuffer,
295    {
296        let (len, result) = self.with_transmit_queue_mut(
297            device_id,
298            |TransmitQueueState { allocator, queue, checksum_offload_spec }| {
299                let queue_len = queue.as_ref().map_or(0, |q| q.len());
300                let inserter = match queue {
301                    None => None,
302                    Some(q) => match q.tx_inserter() {
303                        Some(i) => Some(i),
304                        None => return Err(TransmitQueueFrameError::QueueFull(body)),
305                    },
306                };
307                let mut context = NetworkSerializationContext::new(checksum_offload_spec.clone());
308                let body = body
309                    .serialize_outer(
310                        &mut context,
311                        NoReuseBufferProvider(BufferAllocAdaptor {
312                            allocator: &mut *allocator,
313                            queue_len,
314                        }),
315                    )
316                    .map_err(|(e, serializer)| {
317                        TransmitQueueFrameError::SerializeError(ErrorAndSerializer {
318                            serializer,
319                            error: e.map_alloc(
320                                |_: <D::TxAllocator as TxBufferAllocator<D::TxBuffer>>::Error| (),
321                            ),
322                        })
323                    })?;
324                meta.set_checksum_offload_result(context.csum_offload_result());
325                let len = body.len();
326                let result = insert_and_notify::<_, _, CC>(
327                    bindings_ctx,
328                    device_id,
329                    inserter,
330                    meta,
331                    allocator.get_buffer(),
332                );
333                Ok((len, result))
334            },
335        )?;
336
337        handle_post_enqueue(self, bindings_ctx, device_id, result)
338            .map(|()| len)
339            .map_err(TransmitQueueFrameError::NoQueue)
340    }
341}
342
343/// Allocator for Tx buffers to be stored in Tx queue.
344pub trait TxBufferAllocator<Buffer> {
345    /// Error for allocating a Tx buffer.
346    type Error;
347
348    /// Allocate Tx buffer. This method is only called while the TX queue is
349    /// locked. `queue_len` is the current length of the TX queue.
350    fn alloc(&mut self, len: usize, queue_len: usize) -> Result<Buffer, Self::Error>;
351}
352
353/// Turns a `TxBufferAllocator` into a buffer allocator for `Buf<&mut [u8]>`.
354/// The caller can serialize into the returned `Buf<&mut [u8]>` and retrieve
355/// the buffer after serialization. This avoids the binary bloat by not adding
356/// serialization code for different buffer types.
357#[derive(Derivative)]
358#[derivative(Default(bound = "A: Default"))]
359pub(super) struct ByteSliceAllocator<A, B> {
360    allocator: A,
361    buffer: Option<B>,
362}
363
364impl<A, B> ByteSliceAllocator<A, B> {
365    fn new(allocator: A) -> Self {
366        Self { allocator, buffer: None }
367    }
368}
369
370impl<A: TxBufferAllocator<B>, B: AsMut<[u8]>> ByteSliceAllocator<A, B> {
371    /// Returns a borrowed slice that the caller can serialize into.
372    fn alloc_buf_slice(
373        &mut self,
374        len: usize,
375        queue_len: usize,
376    ) -> Result<Buf<&mut [u8]>, A::Error> {
377        let old = self.buffer.replace(self.allocator.alloc(len, queue_len)?);
378        debug_assert!(old.is_none());
379        // The allocator may allocate a buffer larger than requested to fulfill
380        // the minimum tx length. We cap it at `len`.
381        Ok(Buf::new(&mut self.buffer.as_mut().expect("must be set").as_mut()[..len], ..))
382    }
383
384    /// This is intended to obtain the ownership of the buffer and store it
385    /// into the tx queue.
386    fn get_buffer(&mut self) -> B {
387        self.buffer.take().expect("must be allocated")
388    }
389}
390
391struct BufferAllocAdaptor<'a, A, B> {
392    allocator: &'a mut ByteSliceAllocator<A, B>,
393    queue_len: usize,
394}
395
396impl<'a, A, B> packet::BufferAlloc<Buf<&'a mut [u8]>> for BufferAllocAdaptor<'a, A, B>
397where
398    B: AsMut<[u8]>,
399    A: TxBufferAllocator<B>,
400{
401    type Error = A::Error;
402
403    fn alloc(self, len: usize) -> Result<Buf<&'a mut [u8]>, Self::Error> {
404        self.allocator.alloc_buf_slice(len, self.queue_len)
405    }
406}
407
408/// An allocator of [`Buf<Vec<u8>>`] .
409#[derive(Default)]
410pub struct BufVecU8Allocator;
411
412impl TxBufferAllocator<Buf<Vec<u8>>> for BufVecU8Allocator {
413    type Error = Never;
414
415    fn alloc(&mut self, len: usize, _qlen: usize) -> Result<Buf<Vec<u8>>, Self::Error> {
416        new_buf_vec(len)
417    }
418}
419
420#[cfg(test)]
421mod tests {
422    use super::*;
423
424    use alloc::vec;
425
426    use assert_matches::assert_matches;
427    use net_declare::net_mac;
428    use net_types::ethernet::Mac;
429    use netstack3_base::testutil::{
430        FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId, FakeTxMetadata,
431    };
432
433    impl<BT> DeviceBufferSpec<BT> for FakeLinkDevice {
434        type TxBuffer = packet::Buf<Vec<u8>>;
435        type TxAllocator = BufVecU8Allocator;
436    }
437    use netstack3_base::{
438        ContextPair, CounterContext, CtxPair, ResourceCounterContext, WorkQueueReport,
439    };
440    use test_case::test_case;
441
442    use crate::DeviceCounters;
443    use crate::internal::queue::api::TransmitQueueApi;
444    use crate::internal::queue::{BatchSize, MAX_TX_QUEUED_LEN};
445    use crate::internal::socket::{EthernetFrame, Frame};
446
447    #[derive(Default)]
448    struct FakeTxQueueState {
449        queue: TransmitQueueState<FakeTxMetadata, Buf<Vec<u8>>, BufVecU8Allocator>,
450        transmitted_packets: Vec<(Buf<Vec<u8>>, Option<DequeueContext>)>,
451        no_buffers: bool,
452        stack_wide_device_counters: DeviceCounters,
453        per_device_counters: DeviceCounters,
454    }
455
456    #[derive(Default)]
457    struct FakeTxQueueBindingsCtxState {
458        woken_tx_tasks: Vec<FakeLinkDeviceId>,
459        delivered_to_sockets: Vec<Frame<Vec<u8>>>,
460    }
461
462    type FakeCoreCtxImpl = FakeCoreCtx<FakeTxQueueState, FakeTxMetadata, FakeLinkDeviceId>;
463    type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeTxQueueBindingsCtxState, ()>;
464
465    impl TransmitQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
466        fn wake_tx_task(&mut self, device_id: &FakeLinkDeviceId) {
467            self.state.woken_tx_tasks.push(device_id.clone())
468        }
469    }
470
471    const SRC_MAC: Mac = net_mac!("AA:BB:CC:DD:EE:FF");
472    const DEST_MAC: Mac = net_mac!("FF:EE:DD:CC:BB:AA");
473
474    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
475    struct DequeueContext;
476
477    impl TransmitQueueCommon<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
478        type DequeueContext = DequeueContext;
479        type Meta = FakeTxMetadata;
480
481        fn parse_outgoing_frame<'a, 'b>(
482            buf: &'a [u8],
483            _meta: &'b Self::Meta,
484        ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError> {
485            Ok(fake_sent_ethernet_with_body(buf))
486        }
487    }
488
489    fn fake_sent_ethernet_with_body<B>(body: B) -> SentFrame<B> {
490        SentFrame::Ethernet(EthernetFrame {
491            src_mac: SRC_MAC,
492            dst_mac: DEST_MAC,
493            ethertype: None,
494            body_offset: 0,
495            body,
496        })
497    }
498
499    /// A trait providing a shortcut to instantiate a [`TransmitQueueApi`] from a context.
500    trait TransmitQueueApiExt: ContextPair + Sized {
501        fn transmit_queue_api<D>(&mut self) -> TransmitQueueApi<D, &mut Self> {
502            TransmitQueueApi::new(self)
503        }
504    }
505
506    impl<O> TransmitQueueApiExt for O where O: ContextPair + Sized {}
507
508    impl TransmitQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
509        fn with_transmit_queue<
510            O,
511            F: FnOnce(&TransmitQueueState<FakeTxMetadata, Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
512        >(
513            &mut self,
514            &FakeLinkDeviceId: &FakeLinkDeviceId,
515            cb: F,
516        ) -> O {
517            cb(&self.state.queue)
518        }
519
520        fn with_transmit_queue_mut<
521            O,
522            F: FnOnce(&mut TransmitQueueState<FakeTxMetadata, Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
523        >(
524            &mut self,
525            &FakeLinkDeviceId: &FakeLinkDeviceId,
526            cb: F,
527        ) -> O {
528            cb(&mut self.state.queue)
529        }
530
531        fn send_frame(
532            &mut self,
533            _bindings_ctx: &mut FakeBindingsCtxImpl,
534            &FakeLinkDeviceId: &FakeLinkDeviceId,
535            dequeue_context: Option<&mut DequeueContext>,
536            _meta: FakeTxMetadata,
537            buf: Buf<Vec<u8>>,
538        ) -> Result<(), DeviceSendFrameError> {
539            let FakeTxQueueState { transmitted_packets, no_buffers, .. } = &mut self.state;
540            if *no_buffers {
541                Err(DeviceSendFrameError::NoBuffers)
542            } else {
543                Ok(transmitted_packets.push((buf, dequeue_context.map(|c| *c))))
544            }
545        }
546    }
547
548    impl ResourceCounterContext<FakeLinkDeviceId, DeviceCounters> for FakeCoreCtxImpl {
549        fn per_resource_counters<'a>(
550            &'a self,
551            _resource: &'a FakeLinkDeviceId,
552        ) -> &'a DeviceCounters {
553            &self.state.per_device_counters
554        }
555    }
556
557    impl CounterContext<DeviceCounters> for FakeCoreCtxImpl {
558        fn counters(&self) -> &DeviceCounters {
559            &self.state.stack_wide_device_counters
560        }
561    }
562
563    impl TransmitDequeueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
564        type TransmitQueueCtx<'a> = Self;
565
566        fn with_dequed_packets_and_tx_queue_ctx<
567            O,
568            F: FnOnce(
569                &mut DequeueState<FakeTxMetadata, Buf<Vec<u8>>>,
570                &mut Self::TransmitQueueCtx<'_>,
571            ) -> O,
572        >(
573            &mut self,
574            &FakeLinkDeviceId: &FakeLinkDeviceId,
575            cb: F,
576        ) -> O {
577            cb(&mut DequeueState::default(), self)
578        }
579    }
580
581    impl DeviceSocketHandler<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
582        fn handle_frame(
583            &mut self,
584            bindings_ctx: &mut FakeBindingsCtxImpl,
585            _device: &Self::DeviceId,
586            frame: Frame<&[u8]>,
587            _whole_frame: &[u8],
588        ) {
589            bindings_ctx.state.delivered_to_sockets.push(frame.cloned())
590        }
591    }
592
593    #[test]
594    fn noqueue() {
595        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
596
597        let body = Buf::new(vec![0], ..);
598
599        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
600        assert_eq!(
601            TransmitQueueHandler::queue_tx_frame(
602                core_ctx,
603                bindings_ctx,
604                &FakeLinkDeviceId,
605                FakeTxMetadata,
606                body.clone(),
607            ),
608            Ok(body.len())
609        );
610        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
611            &bindings_ctx.state;
612        assert_matches!(&woken_tx_tasks[..], &[]);
613        assert_eq!(
614            delivered_to_sockets,
615            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
616        );
617        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
618
619        // Should not have any frames waiting to be transmitted since we have no
620        // queue.
621        assert_eq!(
622            ctx.transmit_queue_api().transmit_queued_frames(
623                &FakeLinkDeviceId,
624                BatchSize::default(),
625                &mut DequeueContext,
626            ),
627            Ok(WorkQueueReport::AllDone),
628        );
629
630        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
631        assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
632        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
633    }
634
635    #[test_case(BatchSize::MAX)]
636    #[test_case(BatchSize::MAX/2)]
637    fn fifo_queue_and_dequeue(batch_size: usize) {
638        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
639
640        ctx.transmit_queue_api()
641            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
642
643        for _ in 0..2 {
644            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
645            for i in 0..MAX_TX_QUEUED_LEN {
646                let body = Buf::new(vec![i as u8], ..);
647                assert_eq!(
648                    TransmitQueueHandler::queue_tx_frame(
649                        core_ctx,
650                        bindings_ctx,
651                        &FakeLinkDeviceId,
652                        FakeTxMetadata,
653                        body
654                    ),
655                    Ok(1)
656                );
657                // We should only ever be woken up once when the first packet
658                // was enqueued.
659                assert_eq!(bindings_ctx.state.woken_tx_tasks, [FakeLinkDeviceId]);
660            }
661
662            let body = Buf::new(vec![131], ..);
663            assert_eq!(
664                TransmitQueueHandler::queue_tx_frame(
665                    core_ctx,
666                    bindings_ctx,
667                    &FakeLinkDeviceId,
668                    FakeTxMetadata,
669                    body.clone(),
670                ),
671                Err(TransmitQueueFrameError::QueueFull(body))
672            );
673
674            let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
675                &mut bindings_ctx.state;
676            // We should only ever be woken up once when the first packet
677            // was enqueued.
678            assert_eq!(core::mem::take(woken_tx_tasks), [FakeLinkDeviceId]);
679            // No frames should be delivered to packet sockets before transmit.
680            assert_eq!(core::mem::take(delivered_to_sockets), &[]);
681
682            assert!(MAX_TX_QUEUED_LEN > batch_size);
683            for i in (0..(MAX_TX_QUEUED_LEN - batch_size)).step_by(batch_size) {
684                assert_eq!(
685                    ctx.transmit_queue_api().transmit_queued_frames(
686                        &FakeLinkDeviceId,
687                        BatchSize::new_saturating(batch_size),
688                        &mut DequeueContext
689                    ),
690                    Ok(WorkQueueReport::Pending),
691                );
692                assert_eq!(
693                    core::mem::take(&mut ctx.core_ctx.state.transmitted_packets),
694                    (i..i + batch_size)
695                        .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
696                        .collect::<Vec<_>>()
697                );
698            }
699
700            assert_eq!(
701                ctx.transmit_queue_api().transmit_queued_frames(
702                    &FakeLinkDeviceId,
703                    BatchSize::new_saturating(batch_size),
704                    &mut DequeueContext
705                ),
706                Ok(WorkQueueReport::AllDone),
707            );
708
709            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
710            assert_eq!(
711                core::mem::take(&mut core_ctx.state.transmitted_packets),
712                (batch_size * (MAX_TX_QUEUED_LEN / batch_size - 1)..MAX_TX_QUEUED_LEN)
713                    .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
714                    .collect::<Vec<_>>()
715            );
716            // Should not have woken up the TX task since the queue should be
717            // empty.
718            let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
719                &mut bindings_ctx.state;
720            assert_matches!(&core::mem::take(woken_tx_tasks)[..], &[]);
721
722            // The queue should now be empty so the next iteration of queueing
723            // `MAX_TX_QUEUED_FRAMES` packets should succeed.
724            assert_eq!(
725                core::mem::take(delivered_to_sockets),
726                (0..MAX_TX_QUEUED_LEN)
727                    .map(|i| Frame::Sent(fake_sent_ethernet_with_body(vec![i as u8])))
728                    .collect::<Vec<_>>()
729            );
730        }
731    }
732
733    #[test]
734    fn dequeue_error() {
735        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
736
737        ctx.transmit_queue_api()
738            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
739
740        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
741        let body = Buf::new(vec![0], ..);
742        assert_eq!(
743            TransmitQueueHandler::queue_tx_frame(
744                core_ctx,
745                bindings_ctx,
746                &FakeLinkDeviceId,
747                FakeTxMetadata,
748                body.clone(),
749            ),
750            Ok(body.len())
751        );
752        assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
753        assert_eq!(core_ctx.state.transmitted_packets, []);
754
755        core_ctx.state.no_buffers = true;
756        assert_eq!(
757            ctx.transmit_queue_api().transmit_queued_frames(
758                &FakeLinkDeviceId,
759                BatchSize::default(),
760                &mut DequeueContext
761            ),
762            Err(DeviceSendFrameError::NoBuffers),
763        );
764        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
765        assert_eq!(core_ctx.state.transmitted_packets, []);
766        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
767            &bindings_ctx.state;
768        assert_matches!(&woken_tx_tasks[..], &[]);
769        // Frames were delivered to packet sockets before the device was found
770        // to not be ready.
771        assert_eq!(
772            delivered_to_sockets,
773            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
774        );
775
776        core_ctx.state.no_buffers = false;
777        assert_eq!(
778            ctx.transmit_queue_api().transmit_queued_frames(
779                &FakeLinkDeviceId,
780                BatchSize::default(),
781                &mut DequeueContext
782            ),
783            Ok(WorkQueueReport::AllDone),
784        );
785        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
786        assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
787        // The packet that failed to dequeue is dropped.
788        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
789    }
790
791    #[test_case(true; "device no buffers")]
792    #[test_case(false; "device has buffers")]
793    fn drain_before_noqueue(no_buffers: bool) {
794        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
795
796        ctx.transmit_queue_api()
797            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
798
799        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
800        let body = Buf::new(vec![0], ..);
801        assert_eq!(
802            TransmitQueueHandler::queue_tx_frame(
803                core_ctx,
804                bindings_ctx,
805                &FakeLinkDeviceId,
806                FakeTxMetadata,
807                body.clone(),
808            ),
809            Ok(body.len())
810        );
811        assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
812        assert_eq!(core_ctx.state.transmitted_packets, []);
813
814        core_ctx.state.no_buffers = no_buffers;
815        ctx.transmit_queue_api()
816            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::None);
817
818        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
819        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
820            &bindings_ctx.state;
821        assert_matches!(&woken_tx_tasks[..], &[]);
822        assert_eq!(
823            delivered_to_sockets,
824            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
825        );
826        if no_buffers {
827            assert_eq!(core_ctx.state.transmitted_packets, []);
828        } else {
829            assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
830        }
831    }
832
833    #[test]
834    fn count() {
835        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
836        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), None);
837
838        ctx.transmit_queue_api()
839            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
840
841        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(0));
842
843        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
844        let body = Buf::new(vec![0], ..);
845        assert_eq!(
846            TransmitQueueHandler::queue_tx_frame(
847                core_ctx,
848                bindings_ctx,
849                &FakeLinkDeviceId,
850                FakeTxMetadata,
851                body,
852            ),
853            Ok(1)
854        );
855
856        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(1));
857    }
858}