1use core::convert::Infallible as Never;
8
9use alloc::vec::Vec;
10
11use derivative::Derivative;
12use log::trace;
13use netstack3_base::sync::Mutex;
14use netstack3_base::{
15 ChecksumOffloadResult, ChecksumOffloadSpec, Device, DeviceIdContext, ErrorAndSerializer,
16 NetworkSerializationContext, NetworkSerializer, TxMetadata,
17};
18use packet::{Buf, FragmentedBuffer as _, NoReuseBufferProvider, ReusableBuffer, new_buf_vec};
19
20use crate::internal::base::{DeviceBufferBindingsTypes, DeviceSendFrameError};
21use crate::internal::queue::{
22 DequeueState, DeviceBufferSpec, EnqueueResult, TransmitQueueFrameError, fifo,
23};
24use crate::internal::socket::{DeviceSocketHandler, ParseSentFrameError, SentFrame};
25
26#[derive(Derivative)]
28#[derivative(Default(bound = "Allocator: Default"))]
29pub struct TransmitQueueState<Meta, Buffer, Allocator> {
30 pub(super) allocator: ByteSliceAllocator<Allocator, Buffer>,
31 pub(super) queue: Option<fifo::Queue<Meta, Buffer>>,
32 pub(super) checksum_offload_spec: ChecksumOffloadSpec,
33}
34
35pub struct TransmitQueue<Meta, Buffer, Allocator> {
37 pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
41 pub(crate) queue: Mutex<TransmitQueueState<Meta, Buffer, Allocator>>,
46}
47
48impl<Meta, Buffer, Allocator> TransmitQueue<Meta, Buffer, Allocator> {
49 pub(crate) fn new(allocator: Allocator, checksum_offload_spec: ChecksumOffloadSpec) -> Self {
50 Self {
51 deque: Mutex::new(DequeueState::default()),
52 queue: Mutex::new(TransmitQueueState {
53 allocator: ByteSliceAllocator::new(allocator),
54 queue: None,
55 checksum_offload_spec,
56 }),
57 }
58 }
59}
60
61pub trait TransmitQueueBindingsContext<DeviceId>: DeviceBufferBindingsTypes {
63 fn wake_tx_task(&mut self, device_id: &DeviceId);
70}
71
72pub trait TxQueuePacketMetadataCommon {
74 fn set_checksum_offload_result(&mut self, result: Option<ChecksumOffloadResult>);
76}
77
78impl<T: TxMetadata> TxQueuePacketMetadataCommon for T {
79 fn set_checksum_offload_result(&mut self, result: Option<ChecksumOffloadResult>) {
80 TxMetadata::set_checksum_offload_result(self, result);
81 }
82}
83
84pub trait TransmitQueueCommon<D: Device, C>: DeviceIdContext<D> {
86 type Meta: TxQueuePacketMetadataCommon;
88
89 type DequeueContext;
91
92 fn parse_outgoing_frame<'a, 'b>(
94 buf: &'a [u8],
95 meta: &'a Self::Meta,
96 ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError>;
97}
98
99pub trait TransmitQueueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueCommon<D, BC> {
101 fn with_transmit_queue_mut<
103 O,
104 F: FnOnce(&mut TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
105 >(
106 &mut self,
107 device_id: &Self::DeviceId,
108 cb: F,
109 ) -> O;
110
111 fn with_transmit_queue<
113 O,
114 F: FnOnce(&TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
115 >(
116 &mut self,
117 device_id: &Self::DeviceId,
118 cb: F,
119 ) -> O;
120
121 fn send_frame(
126 &mut self,
127 bindings_ctx: &mut BC,
128 device_id: &Self::DeviceId,
129 dequeue_context: Option<&mut Self::DequeueContext>,
130 meta: Self::Meta,
131 buf: D::TxBuffer,
132 ) -> Result<(), DeviceSendFrameError>;
133}
134
135pub trait TransmitDequeueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueContext<D, BC> {
137 type TransmitQueueCtx<'a>: TransmitQueueContext<
139 D,
140 BC,
141 Meta = Self::Meta,
142 DequeueContext = Self::DequeueContext,
143 DeviceId = Self::DeviceId,
144 > + DeviceSocketHandler<D, BC>;
145
146 fn with_dequed_packets_and_tx_queue_ctx<
148 O,
149 F: FnOnce(&mut DequeueState<Self::Meta, D::TxBuffer>, &mut Self::TransmitQueueCtx<'_>) -> O,
150 >(
151 &mut self,
152 device_id: &Self::DeviceId,
153 cb: F,
154 ) -> O;
155}
156
157pub enum TransmitQueueConfiguration {
159 None,
161 Fifo,
163}
164
165pub trait TransmitQueueHandler<D: Device, BC>: TransmitQueueCommon<D, BC> {
167 fn queue_tx_frame<S>(
169 &mut self,
170 bindings_ctx: &mut BC,
171 device_id: &Self::DeviceId,
172 meta: Self::Meta,
173 body: S,
174 ) -> Result<usize, TransmitQueueFrameError<S>>
175 where
176 S: NetworkSerializer,
177 S::Buffer: ReusableBuffer;
178}
179
180pub(super) fn deliver_to_device_sockets<
181 D: DeviceBufferSpec<BC>,
182 BC: TransmitQueueBindingsContext<CC::DeviceId>,
183 CC: TransmitQueueCommon<D, BC> + DeviceSocketHandler<D, BC>,
184>(
185 core_ctx: &mut CC,
186 bindings_ctx: &mut BC,
187 device_id: &CC::DeviceId,
188 buffer: &D::TxBuffer,
189 meta: &CC::Meta,
190) {
191 buffer.with_bytes(|b| {
192 let mut handle_parsed = |bytes: &[u8]| match CC::parse_outgoing_frame(bytes, meta) {
193 Ok(sent_frame) => DeviceSocketHandler::handle_frame(
194 core_ctx,
195 bindings_ctx,
196 device_id,
197 sent_frame.into(),
198 bytes,
199 ),
200 Err(ParseSentFrameError) => {
201 trace!("failed to parse outgoing frame on {:?} ({} bytes)", device_id, bytes.len())
202 }
203 };
204
205 if let Some(bytes) = b.try_get_contiguous() {
206 handle_parsed(bytes)
207 } else {
208 handle_parsed(&b.to_flattened_vec())
209 }
210 })
211}
212
213impl EnqueueResult {
214 fn maybe_wake_tx<D, BC: TransmitQueueBindingsContext<D>>(
215 self,
216 bindings_ctx: &mut BC,
217 device_id: &D,
218 ) {
219 match self {
220 Self::QueuePreviouslyWasOccupied => (),
221 Self::QueueWasPreviouslyEmpty => bindings_ctx.wake_tx_task(device_id),
222 }
223 }
224}
225
226enum EnqueueStatus<Meta, Buffer> {
227 NotAttempted(Meta, Buffer),
228 Attempted,
229}
230
231fn insert_and_notify<
234 D: DeviceBufferSpec<BC>,
235 BC: TransmitQueueBindingsContext<CC::DeviceId>,
236 CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
237>(
238 bindings_ctx: &mut BC,
239 device_id: &CC::DeviceId,
240 inserter: Option<fifo::QueueTxInserter<'_, CC::Meta, D::TxBuffer>>,
241 meta: CC::Meta,
242 body: D::TxBuffer,
243) -> EnqueueStatus<CC::Meta, D::TxBuffer> {
244 match inserter {
245 None => EnqueueStatus::NotAttempted(meta, body),
247 Some(inserter) => {
248 inserter.insert(meta, body).maybe_wake_tx(bindings_ctx, device_id);
249 EnqueueStatus::Attempted
250 }
251 }
252}
253
254fn handle_post_enqueue<
257 D: DeviceBufferSpec<BC>,
258 BC: TransmitQueueBindingsContext<CC::DeviceId>,
259 CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
260>(
261 core_ctx: &mut CC,
262 bindings_ctx: &mut BC,
263 device_id: &CC::DeviceId,
264 status: EnqueueStatus<CC::Meta, D::TxBuffer>,
265) -> Result<(), DeviceSendFrameError> {
266 match status {
267 EnqueueStatus::NotAttempted(meta, body) => {
268 deliver_to_device_sockets(core_ctx, bindings_ctx, device_id, &body, &meta);
271 core_ctx.send_frame(bindings_ctx, device_id, None, meta, body)
274 }
275 EnqueueStatus::Attempted => Ok(()),
276 }
277}
278
279impl<
280 D: DeviceBufferSpec<BC>,
281 BC: TransmitQueueBindingsContext<CC::DeviceId>,
282 CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
283> TransmitQueueHandler<D, BC> for CC
284{
285 fn queue_tx_frame<S>(
286 &mut self,
287 bindings_ctx: &mut BC,
288 device_id: &CC::DeviceId,
289 mut meta: CC::Meta,
290 body: S,
291 ) -> Result<usize, TransmitQueueFrameError<S>>
292 where
293 S: NetworkSerializer,
294 S::Buffer: ReusableBuffer,
295 {
296 let (len, result) = self.with_transmit_queue_mut(
297 device_id,
298 |TransmitQueueState { allocator, queue, checksum_offload_spec }| {
299 let queue_len = queue.as_ref().map_or(0, |q| q.len());
300 let inserter = match queue {
301 None => None,
302 Some(q) => match q.tx_inserter() {
303 Some(i) => Some(i),
304 None => return Err(TransmitQueueFrameError::QueueFull(body)),
305 },
306 };
307 let mut context = NetworkSerializationContext::new(checksum_offload_spec.clone());
308 let body = body
309 .serialize_outer(
310 &mut context,
311 NoReuseBufferProvider(BufferAllocAdaptor {
312 allocator: &mut *allocator,
313 queue_len,
314 }),
315 )
316 .map_err(|(e, serializer)| {
317 TransmitQueueFrameError::SerializeError(ErrorAndSerializer {
318 serializer,
319 error: e.map_alloc(
320 |_: <D::TxAllocator as TxBufferAllocator<D::TxBuffer>>::Error| (),
321 ),
322 })
323 })?;
324 meta.set_checksum_offload_result(context.csum_offload_result());
325 let len = body.len();
326 let result = insert_and_notify::<_, _, CC>(
327 bindings_ctx,
328 device_id,
329 inserter,
330 meta,
331 allocator.get_buffer(),
332 );
333 Ok((len, result))
334 },
335 )?;
336
337 handle_post_enqueue(self, bindings_ctx, device_id, result)
338 .map(|()| len)
339 .map_err(TransmitQueueFrameError::NoQueue)
340 }
341}
342
343pub trait TxBufferAllocator<Buffer> {
345 type Error;
347
348 fn alloc(&mut self, len: usize, queue_len: usize) -> Result<Buffer, Self::Error>;
351}
352
353#[derive(Derivative)]
358#[derivative(Default(bound = "A: Default"))]
359pub(super) struct ByteSliceAllocator<A, B> {
360 allocator: A,
361 buffer: Option<B>,
362}
363
364impl<A, B> ByteSliceAllocator<A, B> {
365 fn new(allocator: A) -> Self {
366 Self { allocator, buffer: None }
367 }
368}
369
370impl<A: TxBufferAllocator<B>, B: AsMut<[u8]>> ByteSliceAllocator<A, B> {
371 fn alloc_buf_slice(
373 &mut self,
374 len: usize,
375 queue_len: usize,
376 ) -> Result<Buf<&mut [u8]>, A::Error> {
377 let old = self.buffer.replace(self.allocator.alloc(len, queue_len)?);
378 debug_assert!(old.is_none());
379 Ok(Buf::new(&mut self.buffer.as_mut().expect("must be set").as_mut()[..len], ..))
382 }
383
384 fn get_buffer(&mut self) -> B {
387 self.buffer.take().expect("must be allocated")
388 }
389}
390
391struct BufferAllocAdaptor<'a, A, B> {
392 allocator: &'a mut ByteSliceAllocator<A, B>,
393 queue_len: usize,
394}
395
396impl<'a, A, B> packet::BufferAlloc<Buf<&'a mut [u8]>> for BufferAllocAdaptor<'a, A, B>
397where
398 B: AsMut<[u8]>,
399 A: TxBufferAllocator<B>,
400{
401 type Error = A::Error;
402
403 fn alloc(self, len: usize) -> Result<Buf<&'a mut [u8]>, Self::Error> {
404 self.allocator.alloc_buf_slice(len, self.queue_len)
405 }
406}
407
408#[derive(Default)]
410pub struct BufVecU8Allocator;
411
412impl TxBufferAllocator<Buf<Vec<u8>>> for BufVecU8Allocator {
413 type Error = Never;
414
415 fn alloc(&mut self, len: usize, _qlen: usize) -> Result<Buf<Vec<u8>>, Self::Error> {
416 new_buf_vec(len)
417 }
418}
419
420#[cfg(test)]
421mod tests {
422 use super::*;
423
424 use alloc::vec;
425
426 use assert_matches::assert_matches;
427 use net_declare::net_mac;
428 use net_types::ethernet::Mac;
429 use netstack3_base::testutil::{
430 FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId, FakeTxMetadata,
431 };
432
433 impl<BT> DeviceBufferSpec<BT> for FakeLinkDevice {
434 type TxBuffer = packet::Buf<Vec<u8>>;
435 type TxAllocator = BufVecU8Allocator;
436 }
437 use netstack3_base::{
438 ContextPair, CounterContext, CtxPair, ResourceCounterContext, WorkQueueReport,
439 };
440 use test_case::test_case;
441
442 use crate::DeviceCounters;
443 use crate::internal::queue::api::TransmitQueueApi;
444 use crate::internal::queue::{BatchSize, MAX_TX_QUEUED_LEN};
445 use crate::internal::socket::{EthernetFrame, Frame};
446
447 #[derive(Default)]
448 struct FakeTxQueueState {
449 queue: TransmitQueueState<FakeTxMetadata, Buf<Vec<u8>>, BufVecU8Allocator>,
450 transmitted_packets: Vec<(Buf<Vec<u8>>, Option<DequeueContext>)>,
451 no_buffers: bool,
452 stack_wide_device_counters: DeviceCounters,
453 per_device_counters: DeviceCounters,
454 }
455
456 #[derive(Default)]
457 struct FakeTxQueueBindingsCtxState {
458 woken_tx_tasks: Vec<FakeLinkDeviceId>,
459 delivered_to_sockets: Vec<Frame<Vec<u8>>>,
460 }
461
462 type FakeCoreCtxImpl = FakeCoreCtx<FakeTxQueueState, FakeTxMetadata, FakeLinkDeviceId>;
463 type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeTxQueueBindingsCtxState, ()>;
464
465 impl TransmitQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
466 fn wake_tx_task(&mut self, device_id: &FakeLinkDeviceId) {
467 self.state.woken_tx_tasks.push(device_id.clone())
468 }
469 }
470
471 const SRC_MAC: Mac = net_mac!("AA:BB:CC:DD:EE:FF");
472 const DEST_MAC: Mac = net_mac!("FF:EE:DD:CC:BB:AA");
473
474 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
475 struct DequeueContext;
476
477 impl TransmitQueueCommon<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
478 type DequeueContext = DequeueContext;
479 type Meta = FakeTxMetadata;
480
481 fn parse_outgoing_frame<'a, 'b>(
482 buf: &'a [u8],
483 _meta: &'b Self::Meta,
484 ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError> {
485 Ok(fake_sent_ethernet_with_body(buf))
486 }
487 }
488
489 fn fake_sent_ethernet_with_body<B>(body: B) -> SentFrame<B> {
490 SentFrame::Ethernet(EthernetFrame {
491 src_mac: SRC_MAC,
492 dst_mac: DEST_MAC,
493 ethertype: None,
494 body_offset: 0,
495 body,
496 })
497 }
498
499 trait TransmitQueueApiExt: ContextPair + Sized {
501 fn transmit_queue_api<D>(&mut self) -> TransmitQueueApi<D, &mut Self> {
502 TransmitQueueApi::new(self)
503 }
504 }
505
506 impl<O> TransmitQueueApiExt for O where O: ContextPair + Sized {}
507
508 impl TransmitQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
509 fn with_transmit_queue<
510 O,
511 F: FnOnce(&TransmitQueueState<FakeTxMetadata, Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
512 >(
513 &mut self,
514 &FakeLinkDeviceId: &FakeLinkDeviceId,
515 cb: F,
516 ) -> O {
517 cb(&self.state.queue)
518 }
519
520 fn with_transmit_queue_mut<
521 O,
522 F: FnOnce(&mut TransmitQueueState<FakeTxMetadata, Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
523 >(
524 &mut self,
525 &FakeLinkDeviceId: &FakeLinkDeviceId,
526 cb: F,
527 ) -> O {
528 cb(&mut self.state.queue)
529 }
530
531 fn send_frame(
532 &mut self,
533 _bindings_ctx: &mut FakeBindingsCtxImpl,
534 &FakeLinkDeviceId: &FakeLinkDeviceId,
535 dequeue_context: Option<&mut DequeueContext>,
536 _meta: FakeTxMetadata,
537 buf: Buf<Vec<u8>>,
538 ) -> Result<(), DeviceSendFrameError> {
539 let FakeTxQueueState { transmitted_packets, no_buffers, .. } = &mut self.state;
540 if *no_buffers {
541 Err(DeviceSendFrameError::NoBuffers)
542 } else {
543 Ok(transmitted_packets.push((buf, dequeue_context.map(|c| *c))))
544 }
545 }
546 }
547
548 impl ResourceCounterContext<FakeLinkDeviceId, DeviceCounters> for FakeCoreCtxImpl {
549 fn per_resource_counters<'a>(
550 &'a self,
551 _resource: &'a FakeLinkDeviceId,
552 ) -> &'a DeviceCounters {
553 &self.state.per_device_counters
554 }
555 }
556
557 impl CounterContext<DeviceCounters> for FakeCoreCtxImpl {
558 fn counters(&self) -> &DeviceCounters {
559 &self.state.stack_wide_device_counters
560 }
561 }
562
563 impl TransmitDequeueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
564 type TransmitQueueCtx<'a> = Self;
565
566 fn with_dequed_packets_and_tx_queue_ctx<
567 O,
568 F: FnOnce(
569 &mut DequeueState<FakeTxMetadata, Buf<Vec<u8>>>,
570 &mut Self::TransmitQueueCtx<'_>,
571 ) -> O,
572 >(
573 &mut self,
574 &FakeLinkDeviceId: &FakeLinkDeviceId,
575 cb: F,
576 ) -> O {
577 cb(&mut DequeueState::default(), self)
578 }
579 }
580
581 impl DeviceSocketHandler<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
582 fn handle_frame(
583 &mut self,
584 bindings_ctx: &mut FakeBindingsCtxImpl,
585 _device: &Self::DeviceId,
586 frame: Frame<&[u8]>,
587 _whole_frame: &[u8],
588 ) {
589 bindings_ctx.state.delivered_to_sockets.push(frame.cloned())
590 }
591 }
592
593 #[test]
594 fn noqueue() {
595 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
596
597 let body = Buf::new(vec![0], ..);
598
599 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
600 assert_eq!(
601 TransmitQueueHandler::queue_tx_frame(
602 core_ctx,
603 bindings_ctx,
604 &FakeLinkDeviceId,
605 FakeTxMetadata,
606 body.clone(),
607 ),
608 Ok(body.len())
609 );
610 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
611 &bindings_ctx.state;
612 assert_matches!(&woken_tx_tasks[..], &[]);
613 assert_eq!(
614 delivered_to_sockets,
615 &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
616 );
617 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
618
619 assert_eq!(
622 ctx.transmit_queue_api().transmit_queued_frames(
623 &FakeLinkDeviceId,
624 BatchSize::default(),
625 &mut DequeueContext,
626 ),
627 Ok(WorkQueueReport::AllDone),
628 );
629
630 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
631 assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
632 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
633 }
634
635 #[test_case(BatchSize::MAX)]
636 #[test_case(BatchSize::MAX/2)]
637 fn fifo_queue_and_dequeue(batch_size: usize) {
638 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
639
640 ctx.transmit_queue_api()
641 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
642
643 for _ in 0..2 {
644 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
645 for i in 0..MAX_TX_QUEUED_LEN {
646 let body = Buf::new(vec![i as u8], ..);
647 assert_eq!(
648 TransmitQueueHandler::queue_tx_frame(
649 core_ctx,
650 bindings_ctx,
651 &FakeLinkDeviceId,
652 FakeTxMetadata,
653 body
654 ),
655 Ok(1)
656 );
657 assert_eq!(bindings_ctx.state.woken_tx_tasks, [FakeLinkDeviceId]);
660 }
661
662 let body = Buf::new(vec![131], ..);
663 assert_eq!(
664 TransmitQueueHandler::queue_tx_frame(
665 core_ctx,
666 bindings_ctx,
667 &FakeLinkDeviceId,
668 FakeTxMetadata,
669 body.clone(),
670 ),
671 Err(TransmitQueueFrameError::QueueFull(body))
672 );
673
674 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
675 &mut bindings_ctx.state;
676 assert_eq!(core::mem::take(woken_tx_tasks), [FakeLinkDeviceId]);
679 assert_eq!(core::mem::take(delivered_to_sockets), &[]);
681
682 assert!(MAX_TX_QUEUED_LEN > batch_size);
683 for i in (0..(MAX_TX_QUEUED_LEN - batch_size)).step_by(batch_size) {
684 assert_eq!(
685 ctx.transmit_queue_api().transmit_queued_frames(
686 &FakeLinkDeviceId,
687 BatchSize::new_saturating(batch_size),
688 &mut DequeueContext
689 ),
690 Ok(WorkQueueReport::Pending),
691 );
692 assert_eq!(
693 core::mem::take(&mut ctx.core_ctx.state.transmitted_packets),
694 (i..i + batch_size)
695 .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
696 .collect::<Vec<_>>()
697 );
698 }
699
700 assert_eq!(
701 ctx.transmit_queue_api().transmit_queued_frames(
702 &FakeLinkDeviceId,
703 BatchSize::new_saturating(batch_size),
704 &mut DequeueContext
705 ),
706 Ok(WorkQueueReport::AllDone),
707 );
708
709 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
710 assert_eq!(
711 core::mem::take(&mut core_ctx.state.transmitted_packets),
712 (batch_size * (MAX_TX_QUEUED_LEN / batch_size - 1)..MAX_TX_QUEUED_LEN)
713 .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
714 .collect::<Vec<_>>()
715 );
716 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
719 &mut bindings_ctx.state;
720 assert_matches!(&core::mem::take(woken_tx_tasks)[..], &[]);
721
722 assert_eq!(
725 core::mem::take(delivered_to_sockets),
726 (0..MAX_TX_QUEUED_LEN)
727 .map(|i| Frame::Sent(fake_sent_ethernet_with_body(vec![i as u8])))
728 .collect::<Vec<_>>()
729 );
730 }
731 }
732
733 #[test]
734 fn dequeue_error() {
735 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
736
737 ctx.transmit_queue_api()
738 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
739
740 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
741 let body = Buf::new(vec![0], ..);
742 assert_eq!(
743 TransmitQueueHandler::queue_tx_frame(
744 core_ctx,
745 bindings_ctx,
746 &FakeLinkDeviceId,
747 FakeTxMetadata,
748 body.clone(),
749 ),
750 Ok(body.len())
751 );
752 assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
753 assert_eq!(core_ctx.state.transmitted_packets, []);
754
755 core_ctx.state.no_buffers = true;
756 assert_eq!(
757 ctx.transmit_queue_api().transmit_queued_frames(
758 &FakeLinkDeviceId,
759 BatchSize::default(),
760 &mut DequeueContext
761 ),
762 Err(DeviceSendFrameError::NoBuffers),
763 );
764 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
765 assert_eq!(core_ctx.state.transmitted_packets, []);
766 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
767 &bindings_ctx.state;
768 assert_matches!(&woken_tx_tasks[..], &[]);
769 assert_eq!(
772 delivered_to_sockets,
773 &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
774 );
775
776 core_ctx.state.no_buffers = false;
777 assert_eq!(
778 ctx.transmit_queue_api().transmit_queued_frames(
779 &FakeLinkDeviceId,
780 BatchSize::default(),
781 &mut DequeueContext
782 ),
783 Ok(WorkQueueReport::AllDone),
784 );
785 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
786 assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
787 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
789 }
790
791 #[test_case(true; "device no buffers")]
792 #[test_case(false; "device has buffers")]
793 fn drain_before_noqueue(no_buffers: bool) {
794 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
795
796 ctx.transmit_queue_api()
797 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
798
799 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
800 let body = Buf::new(vec![0], ..);
801 assert_eq!(
802 TransmitQueueHandler::queue_tx_frame(
803 core_ctx,
804 bindings_ctx,
805 &FakeLinkDeviceId,
806 FakeTxMetadata,
807 body.clone(),
808 ),
809 Ok(body.len())
810 );
811 assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
812 assert_eq!(core_ctx.state.transmitted_packets, []);
813
814 core_ctx.state.no_buffers = no_buffers;
815 ctx.transmit_queue_api()
816 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::None);
817
818 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
819 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
820 &bindings_ctx.state;
821 assert_matches!(&woken_tx_tasks[..], &[]);
822 assert_eq!(
823 delivered_to_sockets,
824 &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
825 );
826 if no_buffers {
827 assert_eq!(core_ctx.state.transmitted_packets, []);
828 } else {
829 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
830 }
831 }
832
833 #[test]
834 fn count() {
835 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
836 assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), None);
837
838 ctx.transmit_queue_api()
839 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
840
841 assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(0));
842
843 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
844 let body = Buf::new(vec![0], ..);
845 assert_eq!(
846 TransmitQueueHandler::queue_tx_frame(
847 core_ctx,
848 bindings_ctx,
849 &FakeLinkDeviceId,
850 FakeTxMetadata,
851 body,
852 ),
853 Ok(1)
854 );
855
856 assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(1));
857 }
858}