Skip to main content

netstack3_device/queue/
tx.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! TX device queues.
6
7use core::convert::Infallible as Never;
8
9use alloc::vec::Vec;
10
11use derivative::Derivative;
12use log::trace;
13use netstack3_base::sync::Mutex;
14use netstack3_base::{
15    ChecksumOffloadSpec, Device, DeviceIdContext, ErrorAndSerializer, NetworkSerializationContext,
16    NetworkSerializer,
17};
18use packet::{Buf, FragmentedBuffer as _, NoReuseBufferProvider, ReusableBuffer, new_buf_vec};
19
20use crate::internal::base::{DeviceBufferBindingsTypes, DeviceSendFrameError};
21use crate::internal::queue::{
22    DequeueState, DeviceBufferSpec, EnqueueResult, TransmitQueueFrameError, fifo,
23};
24use crate::internal::socket::{DeviceSocketHandler, ParseSentFrameError, SentFrame};
25
26/// State associated with a device transmit queue.
27#[derive(Derivative)]
28#[derivative(Default(bound = "Allocator: Default"))]
29pub struct TransmitQueueState<Meta, Buffer, Allocator> {
30    pub(super) allocator: ByteSliceAllocator<Allocator, Buffer>,
31    pub(super) queue: Option<fifo::Queue<Meta, Buffer>>,
32    pub(super) checksum_offload_spec: ChecksumOffloadSpec,
33}
34
35/// Holds queue and dequeue state for the transmit queue.
36pub struct TransmitQueue<Meta, Buffer, Allocator> {
37    /// The state for dequeued packets that will be handled.
38    ///
39    /// See `queue` for lock ordering.
40    pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
41    /// A queue of to-be-transmitted packets protected by a lock.
42    ///
43    /// Lock ordering: `deque` must be locked before `queue` is locked when both
44    /// are needed at the same time.
45    pub(crate) queue: Mutex<TransmitQueueState<Meta, Buffer, Allocator>>,
46}
47
48impl<Meta, Buffer, Allocator> TransmitQueue<Meta, Buffer, Allocator> {
49    pub(crate) fn new(allocator: Allocator, checksum_offload_spec: ChecksumOffloadSpec) -> Self {
50        Self {
51            deque: Mutex::new(DequeueState::default()),
52            queue: Mutex::new(TransmitQueueState {
53                allocator: ByteSliceAllocator::new(allocator),
54                queue: None,
55                checksum_offload_spec,
56            }),
57        }
58    }
59}
60
61/// The bindings context for the transmit queue.
62pub trait TransmitQueueBindingsContext<DeviceId>: DeviceBufferBindingsTypes {
63    /// Signals to bindings that TX frames are available and ready to be sent
64    /// over the device.
65    ///
66    /// Implementations must make sure that the API call to handle queued
67    /// packets is scheduled to be called as soon as possible so that enqueued
68    /// TX frames are promptly handled.
69    fn wake_tx_task(&mut self, device_id: &DeviceId);
70}
71
72/// Basic definitions for a transmit queue.
73pub trait TransmitQueueCommon<D: Device, C>: DeviceIdContext<D> {
74    /// The metadata associated with every packet in the queue.
75    type Meta;
76
77    /// The context given to `send_frame` when dequeueing.
78    type DequeueContext;
79
80    /// Parses an outgoing frame for packet socket delivery.
81    fn parse_outgoing_frame<'a, 'b>(
82        buf: &'a [u8],
83        meta: &'a Self::Meta,
84    ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError>;
85}
86
87/// The execution context for a transmit queue.
88pub trait TransmitQueueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueCommon<D, BC> {
89    /// Calls `cb` with mutable access to the queue state.
90    fn with_transmit_queue_mut<
91        O,
92        F: FnOnce(&mut TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
93    >(
94        &mut self,
95        device_id: &Self::DeviceId,
96        cb: F,
97    ) -> O;
98
99    /// Calls `cb` with immutable access to the queue state.
100    fn with_transmit_queue<
101        O,
102        F: FnOnce(&TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
103    >(
104        &mut self,
105        device_id: &Self::DeviceId,
106        cb: F,
107    ) -> O;
108
109    /// Send a frame out the device.
110    ///
111    /// This method may not block - if the device is not ready, an appropriate
112    /// error must be returned.
113    fn send_frame(
114        &mut self,
115        bindings_ctx: &mut BC,
116        device_id: &Self::DeviceId,
117        dequeue_context: Option<&mut Self::DequeueContext>,
118        meta: Self::Meta,
119        buf: D::TxBuffer,
120    ) -> Result<(), DeviceSendFrameError>;
121}
122
123/// The core execution context for dequeueing TX frames from the transmit queue.
124pub trait TransmitDequeueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueContext<D, BC> {
125    /// The inner context providing dequeuing.
126    type TransmitQueueCtx<'a>: TransmitQueueContext<
127            D,
128            BC,
129            Meta = Self::Meta,
130            DequeueContext = Self::DequeueContext,
131            DeviceId = Self::DeviceId,
132        > + DeviceSocketHandler<D, BC>;
133
134    /// Calls the function with the TX deque state and the TX queue context.
135    fn with_dequed_packets_and_tx_queue_ctx<
136        O,
137        F: FnOnce(&mut DequeueState<Self::Meta, D::TxBuffer>, &mut Self::TransmitQueueCtx<'_>) -> O,
138    >(
139        &mut self,
140        device_id: &Self::DeviceId,
141        cb: F,
142    ) -> O;
143}
144
145/// The configuration for a transmit queue.
146pub enum TransmitQueueConfiguration {
147    /// No queue.
148    None,
149    /// FiFo queue.
150    Fifo,
151}
152
153/// An implementation of a transmit queue that stores egress frames.
154pub trait TransmitQueueHandler<D: Device, BC>: TransmitQueueCommon<D, BC> {
155    /// Queues a frame for transmission.
156    fn queue_tx_frame<S>(
157        &mut self,
158        bindings_ctx: &mut BC,
159        device_id: &Self::DeviceId,
160        meta: Self::Meta,
161        body: S,
162    ) -> Result<usize, TransmitQueueFrameError<S>>
163    where
164        S: NetworkSerializer,
165        S::Buffer: ReusableBuffer;
166}
167
168pub(super) fn deliver_to_device_sockets<
169    D: DeviceBufferSpec<BC>,
170    BC: TransmitQueueBindingsContext<CC::DeviceId>,
171    CC: TransmitQueueCommon<D, BC> + DeviceSocketHandler<D, BC>,
172>(
173    core_ctx: &mut CC,
174    bindings_ctx: &mut BC,
175    device_id: &CC::DeviceId,
176    buffer: &D::TxBuffer,
177    meta: &CC::Meta,
178) {
179    buffer.with_bytes(|b| {
180        let mut handle_parsed = |bytes: &[u8]| match CC::parse_outgoing_frame(bytes, meta) {
181            Ok(sent_frame) => DeviceSocketHandler::handle_frame(
182                core_ctx,
183                bindings_ctx,
184                device_id,
185                sent_frame.into(),
186                bytes,
187            ),
188            Err(ParseSentFrameError) => {
189                trace!("failed to parse outgoing frame on {:?} ({} bytes)", device_id, bytes.len())
190            }
191        };
192
193        if let Some(bytes) = b.try_get_contiguous() {
194            handle_parsed(bytes)
195        } else {
196            handle_parsed(&b.to_flattened_vec())
197        }
198    })
199}
200
201impl EnqueueResult {
202    fn maybe_wake_tx<D, BC: TransmitQueueBindingsContext<D>>(
203        self,
204        bindings_ctx: &mut BC,
205        device_id: &D,
206    ) {
207        match self {
208            Self::QueuePreviouslyWasOccupied => (),
209            Self::QueueWasPreviouslyEmpty => bindings_ctx.wake_tx_task(device_id),
210        }
211    }
212}
213
214enum EnqueueStatus<Meta, Buffer> {
215    NotAttempted(Meta, Buffer),
216    Attempted,
217}
218
219// Extracted to a function without the generic serializer parameter to ease code
220// generation.
221fn insert_and_notify<
222    D: DeviceBufferSpec<BC>,
223    BC: TransmitQueueBindingsContext<CC::DeviceId>,
224    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
225>(
226    bindings_ctx: &mut BC,
227    device_id: &CC::DeviceId,
228    inserter: Option<fifo::QueueTxInserter<'_, CC::Meta, D::TxBuffer>>,
229    meta: CC::Meta,
230    body: D::TxBuffer,
231) -> EnqueueStatus<CC::Meta, D::TxBuffer> {
232    match inserter {
233        // No TX queue so send the frame immediately.
234        None => EnqueueStatus::NotAttempted(meta, body),
235        Some(inserter) => {
236            inserter.insert(meta, body).maybe_wake_tx(bindings_ctx, device_id);
237            EnqueueStatus::Attempted
238        }
239    }
240}
241
242// Extracted to a function without the generic serializer parameter to ease code
243// generation.
244fn handle_post_enqueue<
245    D: DeviceBufferSpec<BC>,
246    BC: TransmitQueueBindingsContext<CC::DeviceId>,
247    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
248>(
249    core_ctx: &mut CC,
250    bindings_ctx: &mut BC,
251    device_id: &CC::DeviceId,
252    status: EnqueueStatus<CC::Meta, D::TxBuffer>,
253) -> Result<(), DeviceSendFrameError> {
254    match status {
255        EnqueueStatus::NotAttempted(meta, body) => {
256            // TODO(https://fxbug.dev/42077654): Deliver the frame to packet
257            // sockets and to the device atomically.
258            deliver_to_device_sockets(core_ctx, bindings_ctx, device_id, &body, &meta);
259            // Send the frame while not holding the TX queue exclusively to
260            // not block concurrent senders from making progress.
261            core_ctx.send_frame(bindings_ctx, device_id, None, meta, body)
262        }
263        EnqueueStatus::Attempted => Ok(()),
264    }
265}
266
267impl<
268    D: DeviceBufferSpec<BC>,
269    BC: TransmitQueueBindingsContext<CC::DeviceId>,
270    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
271> TransmitQueueHandler<D, BC> for CC
272{
273    fn queue_tx_frame<S>(
274        &mut self,
275        bindings_ctx: &mut BC,
276        device_id: &CC::DeviceId,
277        meta: CC::Meta,
278        body: S,
279    ) -> Result<usize, TransmitQueueFrameError<S>>
280    where
281        S: NetworkSerializer,
282        S::Buffer: ReusableBuffer,
283    {
284        let (len, result) = self.with_transmit_queue_mut(
285            device_id,
286            |TransmitQueueState { allocator, queue, checksum_offload_spec }| {
287                let queue_len = queue.as_ref().map_or(0, |q| q.len());
288                let inserter = match queue {
289                    None => None,
290                    Some(q) => match q.tx_inserter() {
291                        Some(i) => Some(i),
292                        None => return Err(TransmitQueueFrameError::QueueFull(body)),
293                    },
294                };
295                let mut context = NetworkSerializationContext::new(checksum_offload_spec.clone());
296                let body = body
297                    .serialize_outer(
298                        &mut context,
299                        NoReuseBufferProvider(BufferAllocAdaptor {
300                            allocator: &mut *allocator,
301                            queue_len,
302                        }),
303                    )
304                    .map_err(|(e, serializer)| {
305                        TransmitQueueFrameError::SerializeError(ErrorAndSerializer {
306                            serializer,
307                            error: e.map_alloc(
308                                |_: <D::TxAllocator as TxBufferAllocator<D::TxBuffer>>::Error| (),
309                            ),
310                        })
311                    })?;
312                // TODO(https://fxbug.dev/512101182): Insert the checksum
313                // offloading result into the queue along with the buffer so
314                // that it can be passed down to netdevice.
315                let len = body.len();
316                let result = insert_and_notify::<_, _, CC>(
317                    bindings_ctx,
318                    device_id,
319                    inserter,
320                    meta,
321                    allocator.get_buffer(),
322                );
323                Ok((len, result))
324            },
325        )?;
326
327        handle_post_enqueue(self, bindings_ctx, device_id, result)
328            .map(|()| len)
329            .map_err(TransmitQueueFrameError::NoQueue)
330    }
331}
332
333/// Allocator for Tx buffers to be stored in Tx queue.
334pub trait TxBufferAllocator<Buffer> {
335    /// Error for allocating a Tx buffer.
336    type Error;
337
338    /// Allocate Tx buffer. This method is only called while the TX queue is
339    /// locked. `queue_len` is the current length of the TX queue.
340    fn alloc(&mut self, len: usize, queue_len: usize) -> Result<Buffer, Self::Error>;
341}
342
343/// Turns a `TxBufferAllocator` into a buffer allocator for `Buf<&mut [u8]>`.
344/// The caller can serialize into the returned `Buf<&mut [u8]>` and retrieve
345/// the buffer after serialization. This avoids the binary bloat by not adding
346/// serialization code for different buffer types.
347#[derive(Derivative)]
348#[derivative(Default(bound = "A: Default"))]
349pub(super) struct ByteSliceAllocator<A, B> {
350    allocator: A,
351    buffer: Option<B>,
352}
353
354impl<A, B> ByteSliceAllocator<A, B> {
355    fn new(allocator: A) -> Self {
356        Self { allocator, buffer: None }
357    }
358}
359
360impl<A: TxBufferAllocator<B>, B: AsMut<[u8]>> ByteSliceAllocator<A, B> {
361    /// Returns a borrowed slice that the caller can serialize into.
362    fn alloc_buf_slice(
363        &mut self,
364        len: usize,
365        queue_len: usize,
366    ) -> Result<Buf<&mut [u8]>, A::Error> {
367        let old = self.buffer.replace(self.allocator.alloc(len, queue_len)?);
368        debug_assert!(old.is_none());
369        // The allocator may allocate a buffer larger than requested to fulfill
370        // the minimum tx length. We cap it at `len`.
371        Ok(Buf::new(&mut self.buffer.as_mut().expect("must be set").as_mut()[..len], ..))
372    }
373
374    /// This is intended to obtain the ownership of the buffer and store it
375    /// into the tx queue.
376    fn get_buffer(&mut self) -> B {
377        self.buffer.take().expect("must be allocated")
378    }
379}
380
381struct BufferAllocAdaptor<'a, A, B> {
382    allocator: &'a mut ByteSliceAllocator<A, B>,
383    queue_len: usize,
384}
385
386impl<'a, A, B> packet::BufferAlloc<Buf<&'a mut [u8]>> for BufferAllocAdaptor<'a, A, B>
387where
388    B: AsMut<[u8]>,
389    A: TxBufferAllocator<B>,
390{
391    type Error = A::Error;
392
393    fn alloc(self, len: usize) -> Result<Buf<&'a mut [u8]>, Self::Error> {
394        self.allocator.alloc_buf_slice(len, self.queue_len)
395    }
396}
397
398/// An allocator of [`Buf<Vec<u8>>`] .
399#[derive(Default)]
400pub struct BufVecU8Allocator;
401
402impl TxBufferAllocator<Buf<Vec<u8>>> for BufVecU8Allocator {
403    type Error = Never;
404
405    fn alloc(&mut self, len: usize, _qlen: usize) -> Result<Buf<Vec<u8>>, Self::Error> {
406        new_buf_vec(len)
407    }
408}
409
410#[cfg(test)]
411mod tests {
412    use super::*;
413
414    use alloc::vec;
415
416    use assert_matches::assert_matches;
417    use net_declare::net_mac;
418    use net_types::ethernet::Mac;
419    use netstack3_base::testutil::{
420        FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
421    };
422
423    impl<BT> DeviceBufferSpec<BT> for FakeLinkDevice {
424        type TxBuffer = packet::Buf<Vec<u8>>;
425        type TxAllocator = BufVecU8Allocator;
426    }
427    use netstack3_base::{
428        ContextPair, CounterContext, CtxPair, ResourceCounterContext, WorkQueueReport,
429    };
430    use test_case::test_case;
431
432    use crate::DeviceCounters;
433    use crate::internal::queue::api::TransmitQueueApi;
434    use crate::internal::queue::{BatchSize, MAX_TX_QUEUED_LEN};
435    use crate::internal::socket::{EthernetFrame, Frame};
436
437    #[derive(Default)]
438    struct FakeTxQueueState {
439        queue: TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>,
440        transmitted_packets: Vec<(Buf<Vec<u8>>, Option<DequeueContext>)>,
441        no_buffers: bool,
442        stack_wide_device_counters: DeviceCounters,
443        per_device_counters: DeviceCounters,
444    }
445
446    #[derive(Default)]
447    struct FakeTxQueueBindingsCtxState {
448        woken_tx_tasks: Vec<FakeLinkDeviceId>,
449        delivered_to_sockets: Vec<Frame<Vec<u8>>>,
450    }
451
452    type FakeCoreCtxImpl = FakeCoreCtx<FakeTxQueueState, (), FakeLinkDeviceId>;
453    type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeTxQueueBindingsCtxState, ()>;
454
455    impl TransmitQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
456        fn wake_tx_task(&mut self, device_id: &FakeLinkDeviceId) {
457            self.state.woken_tx_tasks.push(device_id.clone())
458        }
459    }
460
461    const SRC_MAC: Mac = net_mac!("AA:BB:CC:DD:EE:FF");
462    const DEST_MAC: Mac = net_mac!("FF:EE:DD:CC:BB:AA");
463
464    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
465    struct DequeueContext;
466
467    impl TransmitQueueCommon<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
468        type DequeueContext = DequeueContext;
469        type Meta = ();
470
471        fn parse_outgoing_frame<'a, 'b>(
472            buf: &'a [u8],
473            (): &'b Self::Meta,
474        ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError> {
475            Ok(fake_sent_ethernet_with_body(buf))
476        }
477    }
478
479    fn fake_sent_ethernet_with_body<B>(body: B) -> SentFrame<B> {
480        SentFrame::Ethernet(EthernetFrame {
481            src_mac: SRC_MAC,
482            dst_mac: DEST_MAC,
483            ethertype: None,
484            body_offset: 0,
485            body,
486        })
487    }
488
489    /// A trait providing a shortcut to instantiate a [`TransmitQueueApi`] from a context.
490    trait TransmitQueueApiExt: ContextPair + Sized {
491        fn transmit_queue_api<D>(&mut self) -> TransmitQueueApi<D, &mut Self> {
492            TransmitQueueApi::new(self)
493        }
494    }
495
496    impl<O> TransmitQueueApiExt for O where O: ContextPair + Sized {}
497
498    impl TransmitQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
499        fn with_transmit_queue<
500            O,
501            F: FnOnce(&TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
502        >(
503            &mut self,
504            &FakeLinkDeviceId: &FakeLinkDeviceId,
505            cb: F,
506        ) -> O {
507            cb(&self.state.queue)
508        }
509
510        fn with_transmit_queue_mut<
511            O,
512            F: FnOnce(&mut TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
513        >(
514            &mut self,
515            &FakeLinkDeviceId: &FakeLinkDeviceId,
516            cb: F,
517        ) -> O {
518            cb(&mut self.state.queue)
519        }
520
521        fn send_frame(
522            &mut self,
523            _bindings_ctx: &mut FakeBindingsCtxImpl,
524            &FakeLinkDeviceId: &FakeLinkDeviceId,
525            dequeue_context: Option<&mut DequeueContext>,
526            (): (),
527            buf: Buf<Vec<u8>>,
528        ) -> Result<(), DeviceSendFrameError> {
529            let FakeTxQueueState { transmitted_packets, no_buffers, .. } = &mut self.state;
530            if *no_buffers {
531                Err(DeviceSendFrameError::NoBuffers)
532            } else {
533                Ok(transmitted_packets.push((buf, dequeue_context.map(|c| *c))))
534            }
535        }
536    }
537
538    impl ResourceCounterContext<FakeLinkDeviceId, DeviceCounters> for FakeCoreCtxImpl {
539        fn per_resource_counters<'a>(
540            &'a self,
541            _resource: &'a FakeLinkDeviceId,
542        ) -> &'a DeviceCounters {
543            &self.state.per_device_counters
544        }
545    }
546
547    impl CounterContext<DeviceCounters> for FakeCoreCtxImpl {
548        fn counters(&self) -> &DeviceCounters {
549            &self.state.stack_wide_device_counters
550        }
551    }
552
553    impl TransmitDequeueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
554        type TransmitQueueCtx<'a> = Self;
555
556        fn with_dequed_packets_and_tx_queue_ctx<
557            O,
558            F: FnOnce(&mut DequeueState<(), Buf<Vec<u8>>>, &mut Self::TransmitQueueCtx<'_>) -> O,
559        >(
560            &mut self,
561            &FakeLinkDeviceId: &FakeLinkDeviceId,
562            cb: F,
563        ) -> O {
564            cb(&mut DequeueState::default(), self)
565        }
566    }
567
568    impl DeviceSocketHandler<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
569        fn handle_frame(
570            &mut self,
571            bindings_ctx: &mut FakeBindingsCtxImpl,
572            _device: &Self::DeviceId,
573            frame: Frame<&[u8]>,
574            _whole_frame: &[u8],
575        ) {
576            bindings_ctx.state.delivered_to_sockets.push(frame.cloned())
577        }
578    }
579
580    #[test]
581    fn noqueue() {
582        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
583
584        let body = Buf::new(vec![0], ..);
585
586        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
587        assert_eq!(
588            TransmitQueueHandler::queue_tx_frame(
589                core_ctx,
590                bindings_ctx,
591                &FakeLinkDeviceId,
592                (),
593                body.clone(),
594            ),
595            Ok(body.len())
596        );
597        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
598            &bindings_ctx.state;
599        assert_matches!(&woken_tx_tasks[..], &[]);
600        assert_eq!(
601            delivered_to_sockets,
602            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
603        );
604        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
605
606        // Should not have any frames waiting to be transmitted since we have no
607        // queue.
608        assert_eq!(
609            ctx.transmit_queue_api().transmit_queued_frames(
610                &FakeLinkDeviceId,
611                BatchSize::default(),
612                &mut DequeueContext,
613            ),
614            Ok(WorkQueueReport::AllDone),
615        );
616
617        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
618        assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
619        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
620    }
621
622    #[test_case(BatchSize::MAX)]
623    #[test_case(BatchSize::MAX/2)]
624    fn fifo_queue_and_dequeue(batch_size: usize) {
625        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
626
627        ctx.transmit_queue_api()
628            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
629
630        for _ in 0..2 {
631            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
632            for i in 0..MAX_TX_QUEUED_LEN {
633                let body = Buf::new(vec![i as u8], ..);
634                assert_eq!(
635                    TransmitQueueHandler::queue_tx_frame(
636                        core_ctx,
637                        bindings_ctx,
638                        &FakeLinkDeviceId,
639                        (),
640                        body
641                    ),
642                    Ok(1)
643                );
644                // We should only ever be woken up once when the first packet
645                // was enqueued.
646                assert_eq!(bindings_ctx.state.woken_tx_tasks, [FakeLinkDeviceId]);
647            }
648
649            let body = Buf::new(vec![131], ..);
650            assert_eq!(
651                TransmitQueueHandler::queue_tx_frame(
652                    core_ctx,
653                    bindings_ctx,
654                    &FakeLinkDeviceId,
655                    (),
656                    body.clone(),
657                ),
658                Err(TransmitQueueFrameError::QueueFull(body))
659            );
660
661            let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
662                &mut bindings_ctx.state;
663            // We should only ever be woken up once when the first packet
664            // was enqueued.
665            assert_eq!(core::mem::take(woken_tx_tasks), [FakeLinkDeviceId]);
666            // No frames should be delivered to packet sockets before transmit.
667            assert_eq!(core::mem::take(delivered_to_sockets), &[]);
668
669            assert!(MAX_TX_QUEUED_LEN > batch_size);
670            for i in (0..(MAX_TX_QUEUED_LEN - batch_size)).step_by(batch_size) {
671                assert_eq!(
672                    ctx.transmit_queue_api().transmit_queued_frames(
673                        &FakeLinkDeviceId,
674                        BatchSize::new_saturating(batch_size),
675                        &mut DequeueContext
676                    ),
677                    Ok(WorkQueueReport::Pending),
678                );
679                assert_eq!(
680                    core::mem::take(&mut ctx.core_ctx.state.transmitted_packets),
681                    (i..i + batch_size)
682                        .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
683                        .collect::<Vec<_>>()
684                );
685            }
686
687            assert_eq!(
688                ctx.transmit_queue_api().transmit_queued_frames(
689                    &FakeLinkDeviceId,
690                    BatchSize::new_saturating(batch_size),
691                    &mut DequeueContext
692                ),
693                Ok(WorkQueueReport::AllDone),
694            );
695
696            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
697            assert_eq!(
698                core::mem::take(&mut core_ctx.state.transmitted_packets),
699                (batch_size * (MAX_TX_QUEUED_LEN / batch_size - 1)..MAX_TX_QUEUED_LEN)
700                    .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
701                    .collect::<Vec<_>>()
702            );
703            // Should not have woken up the TX task since the queue should be
704            // empty.
705            let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
706                &mut bindings_ctx.state;
707            assert_matches!(&core::mem::take(woken_tx_tasks)[..], &[]);
708
709            // The queue should now be empty so the next iteration of queueing
710            // `MAX_TX_QUEUED_FRAMES` packets should succeed.
711            assert_eq!(
712                core::mem::take(delivered_to_sockets),
713                (0..MAX_TX_QUEUED_LEN)
714                    .map(|i| Frame::Sent(fake_sent_ethernet_with_body(vec![i as u8])))
715                    .collect::<Vec<_>>()
716            );
717        }
718    }
719
720    #[test]
721    fn dequeue_error() {
722        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
723
724        ctx.transmit_queue_api()
725            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
726
727        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
728        let body = Buf::new(vec![0], ..);
729        assert_eq!(
730            TransmitQueueHandler::queue_tx_frame(
731                core_ctx,
732                bindings_ctx,
733                &FakeLinkDeviceId,
734                (),
735                body.clone(),
736            ),
737            Ok(body.len())
738        );
739        assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
740        assert_eq!(core_ctx.state.transmitted_packets, []);
741
742        core_ctx.state.no_buffers = true;
743        assert_eq!(
744            ctx.transmit_queue_api().transmit_queued_frames(
745                &FakeLinkDeviceId,
746                BatchSize::default(),
747                &mut DequeueContext
748            ),
749            Err(DeviceSendFrameError::NoBuffers),
750        );
751        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
752        assert_eq!(core_ctx.state.transmitted_packets, []);
753        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
754            &bindings_ctx.state;
755        assert_matches!(&woken_tx_tasks[..], &[]);
756        // Frames were delivered to packet sockets before the device was found
757        // to not be ready.
758        assert_eq!(
759            delivered_to_sockets,
760            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
761        );
762
763        core_ctx.state.no_buffers = false;
764        assert_eq!(
765            ctx.transmit_queue_api().transmit_queued_frames(
766                &FakeLinkDeviceId,
767                BatchSize::default(),
768                &mut DequeueContext
769            ),
770            Ok(WorkQueueReport::AllDone),
771        );
772        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
773        assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
774        // The packet that failed to dequeue is dropped.
775        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
776    }
777
778    #[test_case(true; "device no buffers")]
779    #[test_case(false; "device has buffers")]
780    fn drain_before_noqueue(no_buffers: bool) {
781        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
782
783        ctx.transmit_queue_api()
784            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
785
786        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
787        let body = Buf::new(vec![0], ..);
788        assert_eq!(
789            TransmitQueueHandler::queue_tx_frame(
790                core_ctx,
791                bindings_ctx,
792                &FakeLinkDeviceId,
793                (),
794                body.clone(),
795            ),
796            Ok(body.len())
797        );
798        assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
799        assert_eq!(core_ctx.state.transmitted_packets, []);
800
801        core_ctx.state.no_buffers = no_buffers;
802        ctx.transmit_queue_api()
803            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::None);
804
805        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
806        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
807            &bindings_ctx.state;
808        assert_matches!(&woken_tx_tasks[..], &[]);
809        assert_eq!(
810            delivered_to_sockets,
811            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
812        );
813        if no_buffers {
814            assert_eq!(core_ctx.state.transmitted_packets, []);
815        } else {
816            assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
817        }
818    }
819
820    #[test]
821    fn count() {
822        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
823        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), None);
824
825        ctx.transmit_queue_api()
826            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
827
828        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(0));
829
830        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
831        let body = Buf::new(vec![0], ..);
832        assert_eq!(
833            TransmitQueueHandler::queue_tx_frame(
834                core_ctx,
835                bindings_ctx,
836                &FakeLinkDeviceId,
837                (),
838                body,
839            ),
840            Ok(1)
841        );
842
843        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(1));
844    }
845}