netstack3_device/queue/
tx.rs

1// Copyright 2023 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! TX device queues.
6
7use alloc::vec::Vec;
8use core::convert::Infallible as Never;
9
10use derivative::Derivative;
11use log::trace;
12use netstack3_base::sync::Mutex;
13use netstack3_base::{Device, DeviceIdContext, ErrorAndSerializer};
14use packet::{
15    Buf, BufferAlloc, ContiguousBuffer, FragmentedBuffer as _, GrowBufferMut,
16    NoReuseBufferProvider, ReusableBuffer, Serializer, new_buf_vec,
17};
18
19use crate::internal::base::DeviceSendFrameError;
20use crate::internal::queue::{DequeueState, EnqueueResult, TransmitQueueFrameError, fifo};
21use crate::internal::socket::{DeviceSocketHandler, ParseSentFrameError, SentFrame};
22
23/// State associated with a device transmit queue.
24#[derive(Derivative)]
25#[derivative(Default(bound = "Allocator: Default"))]
26pub struct TransmitQueueState<Meta, Buffer, Allocator> {
27    pub(super) allocator: Allocator,
28    pub(super) queue: Option<fifo::Queue<Meta, Buffer>>,
29}
30
31/// Holds queue and dequeue state for the transmit queue.
32#[derive(Derivative)]
33#[derivative(Default(bound = "Allocator: Default"))]
34pub struct TransmitQueue<Meta, Buffer, Allocator> {
35    /// The state for dequeued packets that will be handled.
36    ///
37    /// See `queue` for lock ordering.
38    pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
39    /// A queue of to-be-transmitted packets protected by a lock.
40    ///
41    /// Lock ordering: `deque` must be locked before `queue` is locked when both
42    /// are needed at the same time.
43    pub(crate) queue: Mutex<TransmitQueueState<Meta, Buffer, Allocator>>,
44}
45
46/// The bindings context for the transmit queue.
47pub trait TransmitQueueBindingsContext<DeviceId> {
48    /// Signals to bindings that TX frames are available and ready to be sent
49    /// over the device.
50    ///
51    /// Implementations must make sure that the API call to handle queued
52    /// packets is scheduled to be called as soon as possible so that enqueued
53    /// TX frames are promptly handled.
54    fn wake_tx_task(&mut self, device_id: &DeviceId);
55}
56
57/// Basic definitions for a transmit queue.
58pub trait TransmitQueueCommon<D: Device, C>: DeviceIdContext<D> {
59    /// The metadata associated with every packet in the queue.
60    type Meta;
61    /// An allocator of [`Self::Buffer`].
62    type Allocator;
63    /// The buffer type stored in the queue.
64    type Buffer: GrowBufferMut + ContiguousBuffer;
65    /// The context given to `send_frame` when dequeueing.
66    type DequeueContext;
67
68    /// Parses an outgoing frame for packet socket delivery.
69    fn parse_outgoing_frame<'a, 'b>(
70        buf: &'a [u8],
71        meta: &'a Self::Meta,
72    ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError>;
73}
74
75/// The execution context for a transmit queue.
76pub trait TransmitQueueContext<D: Device, BC>: TransmitQueueCommon<D, BC> {
77    /// Calls `cb` with mutable access to the queue state.
78    fn with_transmit_queue_mut<
79        O,
80        F: FnOnce(&mut TransmitQueueState<Self::Meta, Self::Buffer, Self::Allocator>) -> O,
81    >(
82        &mut self,
83        device_id: &Self::DeviceId,
84        cb: F,
85    ) -> O;
86
87    /// Calls `cb` with immutable access to the queue state.
88    fn with_transmit_queue<
89        O,
90        F: FnOnce(&TransmitQueueState<Self::Meta, Self::Buffer, Self::Allocator>) -> O,
91    >(
92        &mut self,
93        device_id: &Self::DeviceId,
94        cb: F,
95    ) -> O;
96
97    /// Send a frame out the device.
98    ///
99    /// This method may not block - if the device is not ready, an appropriate
100    /// error must be returned.
101    fn send_frame(
102        &mut self,
103        bindings_ctx: &mut BC,
104        device_id: &Self::DeviceId,
105        dequeue_context: Option<&mut Self::DequeueContext>,
106        meta: Self::Meta,
107        buf: Self::Buffer,
108    ) -> Result<(), DeviceSendFrameError>;
109}
110
111/// The core execution context for dequeueing TX frames from the transmit queue.
112pub trait TransmitDequeueContext<D: Device, BC>: TransmitQueueContext<D, BC> {
113    /// The inner context providing dequeuing.
114    type TransmitQueueCtx<'a>: TransmitQueueContext<
115            D,
116            BC,
117            Meta = Self::Meta,
118            Buffer = Self::Buffer,
119            DequeueContext = Self::DequeueContext,
120            DeviceId = Self::DeviceId,
121        > + DeviceSocketHandler<D, BC>;
122
123    /// Calls the function with the TX deque state and the TX queue context.
124    fn with_dequed_packets_and_tx_queue_ctx<
125        O,
126        F: FnOnce(&mut DequeueState<Self::Meta, Self::Buffer>, &mut Self::TransmitQueueCtx<'_>) -> O,
127    >(
128        &mut self,
129        device_id: &Self::DeviceId,
130        cb: F,
131    ) -> O;
132}
133
134/// The configuration for a transmit queue.
135pub enum TransmitQueueConfiguration {
136    /// No queue.
137    None,
138    /// FiFo queue.
139    Fifo,
140}
141
142/// An implementation of a transmit queue that stores egress frames.
143pub trait TransmitQueueHandler<D: Device, BC>: TransmitQueueCommon<D, BC> {
144    /// Queues a frame for transmission.
145    fn queue_tx_frame<S>(
146        &mut self,
147        bindings_ctx: &mut BC,
148        device_id: &Self::DeviceId,
149        meta: Self::Meta,
150        body: S,
151    ) -> Result<usize, TransmitQueueFrameError<S>>
152    where
153        S: Serializer,
154        S::Buffer: ReusableBuffer;
155}
156
157pub(super) fn deliver_to_device_sockets<
158    D: Device,
159    BC: TransmitQueueBindingsContext<CC::DeviceId>,
160    CC: TransmitQueueCommon<D, BC> + DeviceSocketHandler<D, BC>,
161>(
162    core_ctx: &mut CC,
163    bindings_ctx: &mut BC,
164    device_id: &CC::DeviceId,
165    buffer: &CC::Buffer,
166    meta: &CC::Meta,
167) {
168    let bytes = buffer.as_ref();
169    match CC::parse_outgoing_frame(bytes, meta) {
170        Ok(sent_frame) => DeviceSocketHandler::handle_frame(
171            core_ctx,
172            bindings_ctx,
173            device_id,
174            sent_frame.into(),
175            bytes,
176        ),
177        Err(ParseSentFrameError) => {
178            trace!("failed to parse outgoing frame on {:?} ({} bytes)", device_id, bytes.len())
179        }
180    }
181}
182
183impl EnqueueResult {
184    fn maybe_wake_tx<D, BC: TransmitQueueBindingsContext<D>>(
185        self,
186        bindings_ctx: &mut BC,
187        device_id: &D,
188    ) {
189        match self {
190            Self::QueuePreviouslyWasOccupied => (),
191            Self::QueueWasPreviouslyEmpty => bindings_ctx.wake_tx_task(device_id),
192        }
193    }
194}
195
196enum EnqueueStatus<Meta, Buffer> {
197    NotAttempted(Meta, Buffer),
198    Attempted,
199}
200
201// Extracted to a function without the generic serializer parameter to ease code
202// generation.
203fn insert_and_notify<
204    D: Device,
205    BC: TransmitQueueBindingsContext<CC::DeviceId>,
206    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
207>(
208    bindings_ctx: &mut BC,
209    device_id: &CC::DeviceId,
210    inserter: Option<fifo::QueueTxInserter<'_, CC::Meta, CC::Buffer>>,
211    meta: CC::Meta,
212    body: CC::Buffer,
213) -> EnqueueStatus<CC::Meta, CC::Buffer> {
214    match inserter {
215        // No TX queue so send the frame immediately.
216        None => EnqueueStatus::NotAttempted(meta, body),
217        Some(inserter) => {
218            inserter.insert(meta, body).maybe_wake_tx(bindings_ctx, device_id);
219            EnqueueStatus::Attempted
220        }
221    }
222}
223
224// Extracted to a function without the generic serializer parameter to ease code
225// generation.
226fn handle_post_enqueue<
227    D: Device,
228    BC: TransmitQueueBindingsContext<CC::DeviceId>,
229    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
230>(
231    core_ctx: &mut CC,
232    bindings_ctx: &mut BC,
233    device_id: &CC::DeviceId,
234    status: EnqueueStatus<CC::Meta, CC::Buffer>,
235) -> Result<(), DeviceSendFrameError> {
236    match status {
237        EnqueueStatus::NotAttempted(meta, body) => {
238            // TODO(https://fxbug.dev/42077654): Deliver the frame to packet
239            // sockets and to the device atomically.
240            deliver_to_device_sockets(core_ctx, bindings_ctx, device_id, &body, &meta);
241            // Send the frame while not holding the TX queue exclusively to
242            // not block concurrent senders from making progress.
243            core_ctx.send_frame(bindings_ctx, device_id, None, meta, body)
244        }
245        EnqueueStatus::Attempted => Ok(()),
246    }
247}
248
249impl<
250    D: Device,
251    BC: TransmitQueueBindingsContext<CC::DeviceId>,
252    CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
253> TransmitQueueHandler<D, BC> for CC
254where
255    for<'a> &'a mut CC::Allocator: BufferAlloc<CC::Buffer>,
256    CC::Buffer: ReusableBuffer,
257{
258    fn queue_tx_frame<S>(
259        &mut self,
260        bindings_ctx: &mut BC,
261        device_id: &CC::DeviceId,
262        meta: CC::Meta,
263        body: S,
264    ) -> Result<usize, TransmitQueueFrameError<S>>
265    where
266        S: Serializer,
267        S::Buffer: ReusableBuffer,
268    {
269        let (len, result) =
270            self.with_transmit_queue_mut(device_id, |TransmitQueueState { allocator, queue }| {
271                let inserter = match queue {
272                    None => None,
273                    Some(q) => match q.tx_inserter() {
274                        Some(i) => Some(i),
275                        None => return Err(TransmitQueueFrameError::QueueFull(body)),
276                    },
277                };
278                let body = body.serialize_outer(NoReuseBufferProvider(allocator)).map_err(
279                    |(e, serializer)| {
280                        TransmitQueueFrameError::SerializeError(ErrorAndSerializer {
281                            serializer,
282                            error: e.map_alloc(|_| ()),
283                        })
284                    },
285                )?;
286                let len = body.len();
287                let result =
288                    insert_and_notify::<_, _, CC>(bindings_ctx, device_id, inserter, meta, body);
289                Ok((len, result))
290            })?;
291
292        handle_post_enqueue(self, bindings_ctx, device_id, result)
293            .map(|()| len)
294            .map_err(TransmitQueueFrameError::NoQueue)
295    }
296}
297
298/// An allocator of [`Buf<Vec<u8>>`] .
299#[derive(Default)]
300pub struct BufVecU8Allocator;
301
302impl<'a> BufferAlloc<Buf<Vec<u8>>> for &'a mut BufVecU8Allocator {
303    type Error = Never;
304
305    fn alloc(self, len: usize) -> Result<Buf<Vec<u8>>, Self::Error> {
306        new_buf_vec(len)
307    }
308}
309
310#[cfg(test)]
311mod tests {
312    use super::*;
313
314    use alloc::vec;
315
316    use assert_matches::assert_matches;
317    use net_declare::net_mac;
318    use net_types::ethernet::Mac;
319    use netstack3_base::testutil::{
320        FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
321    };
322    use netstack3_base::{
323        ContextPair, CounterContext, CtxPair, ResourceCounterContext, WorkQueueReport,
324    };
325    use test_case::test_case;
326
327    use crate::DeviceCounters;
328    use crate::internal::queue::api::TransmitQueueApi;
329    use crate::internal::queue::{BatchSize, MAX_TX_QUEUED_LEN};
330    use crate::internal::socket::{EthernetFrame, Frame};
331
332    #[derive(Default)]
333    struct FakeTxQueueState {
334        queue: TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>,
335        transmitted_packets: Vec<(Buf<Vec<u8>>, Option<DequeueContext>)>,
336        no_buffers: bool,
337        stack_wide_device_counters: DeviceCounters,
338        per_device_counters: DeviceCounters,
339    }
340
341    #[derive(Default)]
342    struct FakeTxQueueBindingsCtxState {
343        woken_tx_tasks: Vec<FakeLinkDeviceId>,
344        delivered_to_sockets: Vec<Frame<Vec<u8>>>,
345    }
346
347    type FakeCoreCtxImpl = FakeCoreCtx<FakeTxQueueState, (), FakeLinkDeviceId>;
348    type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeTxQueueBindingsCtxState, ()>;
349
350    impl TransmitQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
351        fn wake_tx_task(&mut self, device_id: &FakeLinkDeviceId) {
352            self.state.woken_tx_tasks.push(device_id.clone())
353        }
354    }
355
356    const SRC_MAC: Mac = net_mac!("AA:BB:CC:DD:EE:FF");
357    const DEST_MAC: Mac = net_mac!("FF:EE:DD:CC:BB:AA");
358
359    #[derive(Copy, Clone, Debug, Eq, PartialEq)]
360    struct DequeueContext;
361
362    impl TransmitQueueCommon<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
363        type DequeueContext = DequeueContext;
364        type Meta = ();
365        type Buffer = Buf<Vec<u8>>;
366        type Allocator = BufVecU8Allocator;
367
368        fn parse_outgoing_frame<'a, 'b>(
369            buf: &'a [u8],
370            (): &'b Self::Meta,
371        ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError> {
372            Ok(fake_sent_ethernet_with_body(buf))
373        }
374    }
375
376    fn fake_sent_ethernet_with_body<B>(body: B) -> SentFrame<B> {
377        SentFrame::Ethernet(EthernetFrame {
378            src_mac: SRC_MAC,
379            dst_mac: DEST_MAC,
380            ethertype: None,
381            body_offset: 0,
382            body,
383        })
384    }
385
386    /// A trait providing a shortcut to instantiate a [`TransmitQueueApi`] from a context.
387    trait TransmitQueueApiExt: ContextPair + Sized {
388        fn transmit_queue_api<D>(&mut self) -> TransmitQueueApi<D, &mut Self> {
389            TransmitQueueApi::new(self)
390        }
391    }
392
393    impl<O> TransmitQueueApiExt for O where O: ContextPair + Sized {}
394
395    impl TransmitQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
396        fn with_transmit_queue<
397            O,
398            F: FnOnce(&TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
399        >(
400            &mut self,
401            &FakeLinkDeviceId: &FakeLinkDeviceId,
402            cb: F,
403        ) -> O {
404            cb(&self.state.queue)
405        }
406
407        fn with_transmit_queue_mut<
408            O,
409            F: FnOnce(&mut TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
410        >(
411            &mut self,
412            &FakeLinkDeviceId: &FakeLinkDeviceId,
413            cb: F,
414        ) -> O {
415            cb(&mut self.state.queue)
416        }
417
418        fn send_frame(
419            &mut self,
420            _bindings_ctx: &mut FakeBindingsCtxImpl,
421            &FakeLinkDeviceId: &FakeLinkDeviceId,
422            dequeue_context: Option<&mut DequeueContext>,
423            (): (),
424            buf: Buf<Vec<u8>>,
425        ) -> Result<(), DeviceSendFrameError> {
426            let FakeTxQueueState { transmitted_packets, no_buffers, .. } = &mut self.state;
427            if *no_buffers {
428                Err(DeviceSendFrameError::NoBuffers)
429            } else {
430                Ok(transmitted_packets.push((buf, dequeue_context.map(|c| *c))))
431            }
432        }
433    }
434
435    impl ResourceCounterContext<FakeLinkDeviceId, DeviceCounters> for FakeCoreCtxImpl {
436        fn per_resource_counters<'a>(
437            &'a self,
438            _resource: &'a FakeLinkDeviceId,
439        ) -> &'a DeviceCounters {
440            &self.state.per_device_counters
441        }
442    }
443
444    impl CounterContext<DeviceCounters> for FakeCoreCtxImpl {
445        fn counters(&self) -> &DeviceCounters {
446            &self.state.stack_wide_device_counters
447        }
448    }
449
450    impl TransmitDequeueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
451        type TransmitQueueCtx<'a> = Self;
452
453        fn with_dequed_packets_and_tx_queue_ctx<
454            O,
455            F: FnOnce(
456                &mut DequeueState<Self::Meta, Self::Buffer>,
457                &mut Self::TransmitQueueCtx<'_>,
458            ) -> O,
459        >(
460            &mut self,
461            &FakeLinkDeviceId: &FakeLinkDeviceId,
462            cb: F,
463        ) -> O {
464            cb(&mut DequeueState::default(), self)
465        }
466    }
467
468    impl DeviceSocketHandler<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
469        fn handle_frame(
470            &mut self,
471            bindings_ctx: &mut FakeBindingsCtxImpl,
472            _device: &Self::DeviceId,
473            frame: Frame<&[u8]>,
474            _whole_frame: &[u8],
475        ) {
476            bindings_ctx.state.delivered_to_sockets.push(frame.cloned())
477        }
478    }
479
480    #[test]
481    fn noqueue() {
482        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
483
484        let body = Buf::new(vec![0], ..);
485
486        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
487        assert_eq!(
488            TransmitQueueHandler::queue_tx_frame(
489                core_ctx,
490                bindings_ctx,
491                &FakeLinkDeviceId,
492                (),
493                body.clone(),
494            ),
495            Ok(body.len())
496        );
497        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
498            &bindings_ctx.state;
499        assert_matches!(&woken_tx_tasks[..], &[]);
500        assert_eq!(
501            delivered_to_sockets,
502            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
503        );
504        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
505
506        // Should not have any frames waiting to be transmitted since we have no
507        // queue.
508        assert_eq!(
509            ctx.transmit_queue_api().transmit_queued_frames(
510                &FakeLinkDeviceId,
511                BatchSize::default(),
512                &mut DequeueContext,
513            ),
514            Ok(WorkQueueReport::AllDone),
515        );
516
517        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
518        assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
519        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
520    }
521
522    #[test_case(BatchSize::MAX)]
523    #[test_case(BatchSize::MAX/2)]
524    fn fifo_queue_and_dequeue(batch_size: usize) {
525        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
526
527        ctx.transmit_queue_api()
528            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
529
530        for _ in 0..2 {
531            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
532            for i in 0..MAX_TX_QUEUED_LEN {
533                let body = Buf::new(vec![i as u8], ..);
534                assert_eq!(
535                    TransmitQueueHandler::queue_tx_frame(
536                        core_ctx,
537                        bindings_ctx,
538                        &FakeLinkDeviceId,
539                        (),
540                        body
541                    ),
542                    Ok(1)
543                );
544                // We should only ever be woken up once when the first packet
545                // was enqueued.
546                assert_eq!(bindings_ctx.state.woken_tx_tasks, [FakeLinkDeviceId]);
547            }
548
549            let body = Buf::new(vec![131], ..);
550            assert_eq!(
551                TransmitQueueHandler::queue_tx_frame(
552                    core_ctx,
553                    bindings_ctx,
554                    &FakeLinkDeviceId,
555                    (),
556                    body.clone(),
557                ),
558                Err(TransmitQueueFrameError::QueueFull(body))
559            );
560
561            let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
562                &mut bindings_ctx.state;
563            // We should only ever be woken up once when the first packet
564            // was enqueued.
565            assert_eq!(core::mem::take(woken_tx_tasks), [FakeLinkDeviceId]);
566            // No frames should be delivered to packet sockets before transmit.
567            assert_eq!(core::mem::take(delivered_to_sockets), &[]);
568
569            assert!(MAX_TX_QUEUED_LEN > batch_size);
570            for i in (0..(MAX_TX_QUEUED_LEN - batch_size)).step_by(batch_size) {
571                assert_eq!(
572                    ctx.transmit_queue_api().transmit_queued_frames(
573                        &FakeLinkDeviceId,
574                        BatchSize::new_saturating(batch_size),
575                        &mut DequeueContext
576                    ),
577                    Ok(WorkQueueReport::Pending),
578                );
579                assert_eq!(
580                    core::mem::take(&mut ctx.core_ctx.state.transmitted_packets),
581                    (i..i + batch_size)
582                        .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
583                        .collect::<Vec<_>>()
584                );
585            }
586
587            assert_eq!(
588                ctx.transmit_queue_api().transmit_queued_frames(
589                    &FakeLinkDeviceId,
590                    BatchSize::new_saturating(batch_size),
591                    &mut DequeueContext
592                ),
593                Ok(WorkQueueReport::AllDone),
594            );
595
596            let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
597            assert_eq!(
598                core::mem::take(&mut core_ctx.state.transmitted_packets),
599                (batch_size * (MAX_TX_QUEUED_LEN / batch_size - 1)..MAX_TX_QUEUED_LEN)
600                    .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
601                    .collect::<Vec<_>>()
602            );
603            // Should not have woken up the TX task since the queue should be
604            // empty.
605            let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
606                &mut bindings_ctx.state;
607            assert_matches!(&core::mem::take(woken_tx_tasks)[..], &[]);
608
609            // The queue should now be empty so the next iteration of queueing
610            // `MAX_TX_QUEUED_FRAMES` packets should succeed.
611            assert_eq!(
612                core::mem::take(delivered_to_sockets),
613                (0..MAX_TX_QUEUED_LEN)
614                    .map(|i| Frame::Sent(fake_sent_ethernet_with_body(vec![i as u8])))
615                    .collect::<Vec<_>>()
616            );
617        }
618    }
619
620    #[test]
621    fn dequeue_error() {
622        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
623
624        ctx.transmit_queue_api()
625            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
626
627        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
628        let body = Buf::new(vec![0], ..);
629        assert_eq!(
630            TransmitQueueHandler::queue_tx_frame(
631                core_ctx,
632                bindings_ctx,
633                &FakeLinkDeviceId,
634                (),
635                body.clone(),
636            ),
637            Ok(body.len())
638        );
639        assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
640        assert_eq!(core_ctx.state.transmitted_packets, []);
641
642        core_ctx.state.no_buffers = true;
643        assert_eq!(
644            ctx.transmit_queue_api().transmit_queued_frames(
645                &FakeLinkDeviceId,
646                BatchSize::default(),
647                &mut DequeueContext
648            ),
649            Err(DeviceSendFrameError::NoBuffers),
650        );
651        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
652        assert_eq!(core_ctx.state.transmitted_packets, []);
653        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
654            &bindings_ctx.state;
655        assert_matches!(&woken_tx_tasks[..], &[]);
656        // Frames were delivered to packet sockets before the device was found
657        // to not be ready.
658        assert_eq!(
659            delivered_to_sockets,
660            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
661        );
662
663        core_ctx.state.no_buffers = false;
664        assert_eq!(
665            ctx.transmit_queue_api().transmit_queued_frames(
666                &FakeLinkDeviceId,
667                BatchSize::default(),
668                &mut DequeueContext
669            ),
670            Ok(WorkQueueReport::AllDone),
671        );
672        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
673        assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
674        // The packet that failed to dequeue is dropped.
675        assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
676    }
677
678    #[test_case(true; "device no buffers")]
679    #[test_case(false; "device has buffers")]
680    fn drain_before_noqueue(no_buffers: bool) {
681        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
682
683        ctx.transmit_queue_api()
684            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
685
686        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
687        let body = Buf::new(vec![0], ..);
688        assert_eq!(
689            TransmitQueueHandler::queue_tx_frame(
690                core_ctx,
691                bindings_ctx,
692                &FakeLinkDeviceId,
693                (),
694                body.clone(),
695            ),
696            Ok(body.len())
697        );
698        assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
699        assert_eq!(core_ctx.state.transmitted_packets, []);
700
701        core_ctx.state.no_buffers = no_buffers;
702        ctx.transmit_queue_api()
703            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::None);
704
705        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
706        let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
707            &bindings_ctx.state;
708        assert_matches!(&woken_tx_tasks[..], &[]);
709        assert_eq!(
710            delivered_to_sockets,
711            &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
712        );
713        if no_buffers {
714            assert_eq!(core_ctx.state.transmitted_packets, []);
715        } else {
716            assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
717        }
718    }
719
720    #[test]
721    fn count() {
722        let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
723        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), None);
724
725        ctx.transmit_queue_api()
726            .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
727
728        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(0));
729
730        let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
731        let body = Buf::new(vec![0], ..);
732        assert_eq!(
733            TransmitQueueHandler::queue_tx_frame(
734                core_ctx,
735                bindings_ctx,
736                &FakeLinkDeviceId,
737                (),
738                body,
739            ),
740            Ok(1)
741        );
742
743        assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(1));
744    }
745}