1use core::convert::Infallible as Never;
8
9use alloc::vec::Vec;
10
11use derivative::Derivative;
12use log::trace;
13use netstack3_base::sync::Mutex;
14use netstack3_base::{
15 ChecksumOffloadSpec, Device, DeviceIdContext, ErrorAndSerializer, NetworkSerializationContext,
16 NetworkSerializer,
17};
18use packet::{Buf, FragmentedBuffer as _, NoReuseBufferProvider, ReusableBuffer, new_buf_vec};
19
20use crate::internal::base::{DeviceBufferBindingsTypes, DeviceSendFrameError};
21use crate::internal::queue::{
22 DequeueState, DeviceBufferSpec, EnqueueResult, TransmitQueueFrameError, fifo,
23};
24use crate::internal::socket::{DeviceSocketHandler, ParseSentFrameError, SentFrame};
25
26#[derive(Derivative)]
28#[derivative(Default(bound = "Allocator: Default"))]
29pub struct TransmitQueueState<Meta, Buffer, Allocator> {
30 pub(super) allocator: ByteSliceAllocator<Allocator, Buffer>,
31 pub(super) queue: Option<fifo::Queue<Meta, Buffer>>,
32 pub(super) checksum_offload_spec: ChecksumOffloadSpec,
33}
34
35pub struct TransmitQueue<Meta, Buffer, Allocator> {
37 pub(crate) deque: Mutex<DequeueState<Meta, Buffer>>,
41 pub(crate) queue: Mutex<TransmitQueueState<Meta, Buffer, Allocator>>,
46}
47
48impl<Meta, Buffer, Allocator> TransmitQueue<Meta, Buffer, Allocator> {
49 pub(crate) fn new(allocator: Allocator, checksum_offload_spec: ChecksumOffloadSpec) -> Self {
50 Self {
51 deque: Mutex::new(DequeueState::default()),
52 queue: Mutex::new(TransmitQueueState {
53 allocator: ByteSliceAllocator::new(allocator),
54 queue: None,
55 checksum_offload_spec,
56 }),
57 }
58 }
59}
60
61pub trait TransmitQueueBindingsContext<DeviceId>: DeviceBufferBindingsTypes {
63 fn wake_tx_task(&mut self, device_id: &DeviceId);
70}
71
72pub trait TransmitQueueCommon<D: Device, C>: DeviceIdContext<D> {
74 type Meta;
76
77 type DequeueContext;
79
80 fn parse_outgoing_frame<'a, 'b>(
82 buf: &'a [u8],
83 meta: &'a Self::Meta,
84 ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError>;
85}
86
87pub trait TransmitQueueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueCommon<D, BC> {
89 fn with_transmit_queue_mut<
91 O,
92 F: FnOnce(&mut TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
93 >(
94 &mut self,
95 device_id: &Self::DeviceId,
96 cb: F,
97 ) -> O;
98
99 fn with_transmit_queue<
101 O,
102 F: FnOnce(&TransmitQueueState<Self::Meta, D::TxBuffer, D::TxAllocator>) -> O,
103 >(
104 &mut self,
105 device_id: &Self::DeviceId,
106 cb: F,
107 ) -> O;
108
109 fn send_frame(
114 &mut self,
115 bindings_ctx: &mut BC,
116 device_id: &Self::DeviceId,
117 dequeue_context: Option<&mut Self::DequeueContext>,
118 meta: Self::Meta,
119 buf: D::TxBuffer,
120 ) -> Result<(), DeviceSendFrameError>;
121}
122
123pub trait TransmitDequeueContext<D: DeviceBufferSpec<BC>, BC>: TransmitQueueContext<D, BC> {
125 type TransmitQueueCtx<'a>: TransmitQueueContext<
127 D,
128 BC,
129 Meta = Self::Meta,
130 DequeueContext = Self::DequeueContext,
131 DeviceId = Self::DeviceId,
132 > + DeviceSocketHandler<D, BC>;
133
134 fn with_dequed_packets_and_tx_queue_ctx<
136 O,
137 F: FnOnce(&mut DequeueState<Self::Meta, D::TxBuffer>, &mut Self::TransmitQueueCtx<'_>) -> O,
138 >(
139 &mut self,
140 device_id: &Self::DeviceId,
141 cb: F,
142 ) -> O;
143}
144
145pub enum TransmitQueueConfiguration {
147 None,
149 Fifo,
151}
152
153pub trait TransmitQueueHandler<D: Device, BC>: TransmitQueueCommon<D, BC> {
155 fn queue_tx_frame<S>(
157 &mut self,
158 bindings_ctx: &mut BC,
159 device_id: &Self::DeviceId,
160 meta: Self::Meta,
161 body: S,
162 ) -> Result<usize, TransmitQueueFrameError<S>>
163 where
164 S: NetworkSerializer,
165 S::Buffer: ReusableBuffer;
166}
167
168pub(super) fn deliver_to_device_sockets<
169 D: DeviceBufferSpec<BC>,
170 BC: TransmitQueueBindingsContext<CC::DeviceId>,
171 CC: TransmitQueueCommon<D, BC> + DeviceSocketHandler<D, BC>,
172>(
173 core_ctx: &mut CC,
174 bindings_ctx: &mut BC,
175 device_id: &CC::DeviceId,
176 buffer: &D::TxBuffer,
177 meta: &CC::Meta,
178) {
179 buffer.with_bytes(|b| {
180 let mut handle_parsed = |bytes: &[u8]| match CC::parse_outgoing_frame(bytes, meta) {
181 Ok(sent_frame) => DeviceSocketHandler::handle_frame(
182 core_ctx,
183 bindings_ctx,
184 device_id,
185 sent_frame.into(),
186 bytes,
187 ),
188 Err(ParseSentFrameError) => {
189 trace!("failed to parse outgoing frame on {:?} ({} bytes)", device_id, bytes.len())
190 }
191 };
192
193 if let Some(bytes) = b.try_get_contiguous() {
194 handle_parsed(bytes)
195 } else {
196 handle_parsed(&b.to_flattened_vec())
197 }
198 })
199}
200
201impl EnqueueResult {
202 fn maybe_wake_tx<D, BC: TransmitQueueBindingsContext<D>>(
203 self,
204 bindings_ctx: &mut BC,
205 device_id: &D,
206 ) {
207 match self {
208 Self::QueuePreviouslyWasOccupied => (),
209 Self::QueueWasPreviouslyEmpty => bindings_ctx.wake_tx_task(device_id),
210 }
211 }
212}
213
214enum EnqueueStatus<Meta, Buffer> {
215 NotAttempted(Meta, Buffer),
216 Attempted,
217}
218
219fn insert_and_notify<
222 D: DeviceBufferSpec<BC>,
223 BC: TransmitQueueBindingsContext<CC::DeviceId>,
224 CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
225>(
226 bindings_ctx: &mut BC,
227 device_id: &CC::DeviceId,
228 inserter: Option<fifo::QueueTxInserter<'_, CC::Meta, D::TxBuffer>>,
229 meta: CC::Meta,
230 body: D::TxBuffer,
231) -> EnqueueStatus<CC::Meta, D::TxBuffer> {
232 match inserter {
233 None => EnqueueStatus::NotAttempted(meta, body),
235 Some(inserter) => {
236 inserter.insert(meta, body).maybe_wake_tx(bindings_ctx, device_id);
237 EnqueueStatus::Attempted
238 }
239 }
240}
241
242fn handle_post_enqueue<
245 D: DeviceBufferSpec<BC>,
246 BC: TransmitQueueBindingsContext<CC::DeviceId>,
247 CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
248>(
249 core_ctx: &mut CC,
250 bindings_ctx: &mut BC,
251 device_id: &CC::DeviceId,
252 status: EnqueueStatus<CC::Meta, D::TxBuffer>,
253) -> Result<(), DeviceSendFrameError> {
254 match status {
255 EnqueueStatus::NotAttempted(meta, body) => {
256 deliver_to_device_sockets(core_ctx, bindings_ctx, device_id, &body, &meta);
259 core_ctx.send_frame(bindings_ctx, device_id, None, meta, body)
262 }
263 EnqueueStatus::Attempted => Ok(()),
264 }
265}
266
267impl<
268 D: DeviceBufferSpec<BC>,
269 BC: TransmitQueueBindingsContext<CC::DeviceId>,
270 CC: TransmitQueueContext<D, BC> + DeviceSocketHandler<D, BC>,
271> TransmitQueueHandler<D, BC> for CC
272{
273 fn queue_tx_frame<S>(
274 &mut self,
275 bindings_ctx: &mut BC,
276 device_id: &CC::DeviceId,
277 meta: CC::Meta,
278 body: S,
279 ) -> Result<usize, TransmitQueueFrameError<S>>
280 where
281 S: NetworkSerializer,
282 S::Buffer: ReusableBuffer,
283 {
284 let (len, result) = self.with_transmit_queue_mut(
285 device_id,
286 |TransmitQueueState { allocator, queue, checksum_offload_spec }| {
287 let queue_len = queue.as_ref().map_or(0, |q| q.len());
288 let inserter = match queue {
289 None => None,
290 Some(q) => match q.tx_inserter() {
291 Some(i) => Some(i),
292 None => return Err(TransmitQueueFrameError::QueueFull(body)),
293 },
294 };
295 let mut context = NetworkSerializationContext::new(checksum_offload_spec.clone());
296 let body = body
297 .serialize_outer(
298 &mut context,
299 NoReuseBufferProvider(BufferAllocAdaptor {
300 allocator: &mut *allocator,
301 queue_len,
302 }),
303 )
304 .map_err(|(e, serializer)| {
305 TransmitQueueFrameError::SerializeError(ErrorAndSerializer {
306 serializer,
307 error: e.map_alloc(
308 |_: <D::TxAllocator as TxBufferAllocator<D::TxBuffer>>::Error| (),
309 ),
310 })
311 })?;
312 let len = body.len();
316 let result = insert_and_notify::<_, _, CC>(
317 bindings_ctx,
318 device_id,
319 inserter,
320 meta,
321 allocator.get_buffer(),
322 );
323 Ok((len, result))
324 },
325 )?;
326
327 handle_post_enqueue(self, bindings_ctx, device_id, result)
328 .map(|()| len)
329 .map_err(TransmitQueueFrameError::NoQueue)
330 }
331}
332
333pub trait TxBufferAllocator<Buffer> {
335 type Error;
337
338 fn alloc(&mut self, len: usize, queue_len: usize) -> Result<Buffer, Self::Error>;
341}
342
343#[derive(Derivative)]
348#[derivative(Default(bound = "A: Default"))]
349pub(super) struct ByteSliceAllocator<A, B> {
350 allocator: A,
351 buffer: Option<B>,
352}
353
354impl<A, B> ByteSliceAllocator<A, B> {
355 fn new(allocator: A) -> Self {
356 Self { allocator, buffer: None }
357 }
358}
359
360impl<A: TxBufferAllocator<B>, B: AsMut<[u8]>> ByteSliceAllocator<A, B> {
361 fn alloc_buf_slice(
363 &mut self,
364 len: usize,
365 queue_len: usize,
366 ) -> Result<Buf<&mut [u8]>, A::Error> {
367 let old = self.buffer.replace(self.allocator.alloc(len, queue_len)?);
368 debug_assert!(old.is_none());
369 Ok(Buf::new(&mut self.buffer.as_mut().expect("must be set").as_mut()[..len], ..))
372 }
373
374 fn get_buffer(&mut self) -> B {
377 self.buffer.take().expect("must be allocated")
378 }
379}
380
381struct BufferAllocAdaptor<'a, A, B> {
382 allocator: &'a mut ByteSliceAllocator<A, B>,
383 queue_len: usize,
384}
385
386impl<'a, A, B> packet::BufferAlloc<Buf<&'a mut [u8]>> for BufferAllocAdaptor<'a, A, B>
387where
388 B: AsMut<[u8]>,
389 A: TxBufferAllocator<B>,
390{
391 type Error = A::Error;
392
393 fn alloc(self, len: usize) -> Result<Buf<&'a mut [u8]>, Self::Error> {
394 self.allocator.alloc_buf_slice(len, self.queue_len)
395 }
396}
397
398#[derive(Default)]
400pub struct BufVecU8Allocator;
401
402impl TxBufferAllocator<Buf<Vec<u8>>> for BufVecU8Allocator {
403 type Error = Never;
404
405 fn alloc(&mut self, len: usize, _qlen: usize) -> Result<Buf<Vec<u8>>, Self::Error> {
406 new_buf_vec(len)
407 }
408}
409
410#[cfg(test)]
411mod tests {
412 use super::*;
413
414 use alloc::vec;
415
416 use assert_matches::assert_matches;
417 use net_declare::net_mac;
418 use net_types::ethernet::Mac;
419 use netstack3_base::testutil::{
420 FakeBindingsCtx, FakeCoreCtx, FakeLinkDevice, FakeLinkDeviceId,
421 };
422
423 impl<BT> DeviceBufferSpec<BT> for FakeLinkDevice {
424 type TxBuffer = packet::Buf<Vec<u8>>;
425 type TxAllocator = BufVecU8Allocator;
426 }
427 use netstack3_base::{
428 ContextPair, CounterContext, CtxPair, ResourceCounterContext, WorkQueueReport,
429 };
430 use test_case::test_case;
431
432 use crate::DeviceCounters;
433 use crate::internal::queue::api::TransmitQueueApi;
434 use crate::internal::queue::{BatchSize, MAX_TX_QUEUED_LEN};
435 use crate::internal::socket::{EthernetFrame, Frame};
436
437 #[derive(Default)]
438 struct FakeTxQueueState {
439 queue: TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>,
440 transmitted_packets: Vec<(Buf<Vec<u8>>, Option<DequeueContext>)>,
441 no_buffers: bool,
442 stack_wide_device_counters: DeviceCounters,
443 per_device_counters: DeviceCounters,
444 }
445
446 #[derive(Default)]
447 struct FakeTxQueueBindingsCtxState {
448 woken_tx_tasks: Vec<FakeLinkDeviceId>,
449 delivered_to_sockets: Vec<Frame<Vec<u8>>>,
450 }
451
452 type FakeCoreCtxImpl = FakeCoreCtx<FakeTxQueueState, (), FakeLinkDeviceId>;
453 type FakeBindingsCtxImpl = FakeBindingsCtx<(), (), FakeTxQueueBindingsCtxState, ()>;
454
455 impl TransmitQueueBindingsContext<FakeLinkDeviceId> for FakeBindingsCtxImpl {
456 fn wake_tx_task(&mut self, device_id: &FakeLinkDeviceId) {
457 self.state.woken_tx_tasks.push(device_id.clone())
458 }
459 }
460
461 const SRC_MAC: Mac = net_mac!("AA:BB:CC:DD:EE:FF");
462 const DEST_MAC: Mac = net_mac!("FF:EE:DD:CC:BB:AA");
463
464 #[derive(Copy, Clone, Debug, Eq, PartialEq)]
465 struct DequeueContext;
466
467 impl TransmitQueueCommon<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
468 type DequeueContext = DequeueContext;
469 type Meta = ();
470
471 fn parse_outgoing_frame<'a, 'b>(
472 buf: &'a [u8],
473 (): &'b Self::Meta,
474 ) -> Result<SentFrame<&'a [u8]>, ParseSentFrameError> {
475 Ok(fake_sent_ethernet_with_body(buf))
476 }
477 }
478
479 fn fake_sent_ethernet_with_body<B>(body: B) -> SentFrame<B> {
480 SentFrame::Ethernet(EthernetFrame {
481 src_mac: SRC_MAC,
482 dst_mac: DEST_MAC,
483 ethertype: None,
484 body_offset: 0,
485 body,
486 })
487 }
488
489 trait TransmitQueueApiExt: ContextPair + Sized {
491 fn transmit_queue_api<D>(&mut self) -> TransmitQueueApi<D, &mut Self> {
492 TransmitQueueApi::new(self)
493 }
494 }
495
496 impl<O> TransmitQueueApiExt for O where O: ContextPair + Sized {}
497
498 impl TransmitQueueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
499 fn with_transmit_queue<
500 O,
501 F: FnOnce(&TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
502 >(
503 &mut self,
504 &FakeLinkDeviceId: &FakeLinkDeviceId,
505 cb: F,
506 ) -> O {
507 cb(&self.state.queue)
508 }
509
510 fn with_transmit_queue_mut<
511 O,
512 F: FnOnce(&mut TransmitQueueState<(), Buf<Vec<u8>>, BufVecU8Allocator>) -> O,
513 >(
514 &mut self,
515 &FakeLinkDeviceId: &FakeLinkDeviceId,
516 cb: F,
517 ) -> O {
518 cb(&mut self.state.queue)
519 }
520
521 fn send_frame(
522 &mut self,
523 _bindings_ctx: &mut FakeBindingsCtxImpl,
524 &FakeLinkDeviceId: &FakeLinkDeviceId,
525 dequeue_context: Option<&mut DequeueContext>,
526 (): (),
527 buf: Buf<Vec<u8>>,
528 ) -> Result<(), DeviceSendFrameError> {
529 let FakeTxQueueState { transmitted_packets, no_buffers, .. } = &mut self.state;
530 if *no_buffers {
531 Err(DeviceSendFrameError::NoBuffers)
532 } else {
533 Ok(transmitted_packets.push((buf, dequeue_context.map(|c| *c))))
534 }
535 }
536 }
537
538 impl ResourceCounterContext<FakeLinkDeviceId, DeviceCounters> for FakeCoreCtxImpl {
539 fn per_resource_counters<'a>(
540 &'a self,
541 _resource: &'a FakeLinkDeviceId,
542 ) -> &'a DeviceCounters {
543 &self.state.per_device_counters
544 }
545 }
546
547 impl CounterContext<DeviceCounters> for FakeCoreCtxImpl {
548 fn counters(&self) -> &DeviceCounters {
549 &self.state.stack_wide_device_counters
550 }
551 }
552
553 impl TransmitDequeueContext<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
554 type TransmitQueueCtx<'a> = Self;
555
556 fn with_dequed_packets_and_tx_queue_ctx<
557 O,
558 F: FnOnce(&mut DequeueState<(), Buf<Vec<u8>>>, &mut Self::TransmitQueueCtx<'_>) -> O,
559 >(
560 &mut self,
561 &FakeLinkDeviceId: &FakeLinkDeviceId,
562 cb: F,
563 ) -> O {
564 cb(&mut DequeueState::default(), self)
565 }
566 }
567
568 impl DeviceSocketHandler<FakeLinkDevice, FakeBindingsCtxImpl> for FakeCoreCtxImpl {
569 fn handle_frame(
570 &mut self,
571 bindings_ctx: &mut FakeBindingsCtxImpl,
572 _device: &Self::DeviceId,
573 frame: Frame<&[u8]>,
574 _whole_frame: &[u8],
575 ) {
576 bindings_ctx.state.delivered_to_sockets.push(frame.cloned())
577 }
578 }
579
580 #[test]
581 fn noqueue() {
582 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
583
584 let body = Buf::new(vec![0], ..);
585
586 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
587 assert_eq!(
588 TransmitQueueHandler::queue_tx_frame(
589 core_ctx,
590 bindings_ctx,
591 &FakeLinkDeviceId,
592 (),
593 body.clone(),
594 ),
595 Ok(body.len())
596 );
597 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
598 &bindings_ctx.state;
599 assert_matches!(&woken_tx_tasks[..], &[]);
600 assert_eq!(
601 delivered_to_sockets,
602 &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
603 );
604 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
605
606 assert_eq!(
609 ctx.transmit_queue_api().transmit_queued_frames(
610 &FakeLinkDeviceId,
611 BatchSize::default(),
612 &mut DequeueContext,
613 ),
614 Ok(WorkQueueReport::AllDone),
615 );
616
617 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
618 assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
619 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
620 }
621
622 #[test_case(BatchSize::MAX)]
623 #[test_case(BatchSize::MAX/2)]
624 fn fifo_queue_and_dequeue(batch_size: usize) {
625 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
626
627 ctx.transmit_queue_api()
628 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
629
630 for _ in 0..2 {
631 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
632 for i in 0..MAX_TX_QUEUED_LEN {
633 let body = Buf::new(vec![i as u8], ..);
634 assert_eq!(
635 TransmitQueueHandler::queue_tx_frame(
636 core_ctx,
637 bindings_ctx,
638 &FakeLinkDeviceId,
639 (),
640 body
641 ),
642 Ok(1)
643 );
644 assert_eq!(bindings_ctx.state.woken_tx_tasks, [FakeLinkDeviceId]);
647 }
648
649 let body = Buf::new(vec![131], ..);
650 assert_eq!(
651 TransmitQueueHandler::queue_tx_frame(
652 core_ctx,
653 bindings_ctx,
654 &FakeLinkDeviceId,
655 (),
656 body.clone(),
657 ),
658 Err(TransmitQueueFrameError::QueueFull(body))
659 );
660
661 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
662 &mut bindings_ctx.state;
663 assert_eq!(core::mem::take(woken_tx_tasks), [FakeLinkDeviceId]);
666 assert_eq!(core::mem::take(delivered_to_sockets), &[]);
668
669 assert!(MAX_TX_QUEUED_LEN > batch_size);
670 for i in (0..(MAX_TX_QUEUED_LEN - batch_size)).step_by(batch_size) {
671 assert_eq!(
672 ctx.transmit_queue_api().transmit_queued_frames(
673 &FakeLinkDeviceId,
674 BatchSize::new_saturating(batch_size),
675 &mut DequeueContext
676 ),
677 Ok(WorkQueueReport::Pending),
678 );
679 assert_eq!(
680 core::mem::take(&mut ctx.core_ctx.state.transmitted_packets),
681 (i..i + batch_size)
682 .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
683 .collect::<Vec<_>>()
684 );
685 }
686
687 assert_eq!(
688 ctx.transmit_queue_api().transmit_queued_frames(
689 &FakeLinkDeviceId,
690 BatchSize::new_saturating(batch_size),
691 &mut DequeueContext
692 ),
693 Ok(WorkQueueReport::AllDone),
694 );
695
696 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
697 assert_eq!(
698 core::mem::take(&mut core_ctx.state.transmitted_packets),
699 (batch_size * (MAX_TX_QUEUED_LEN / batch_size - 1)..MAX_TX_QUEUED_LEN)
700 .map(|i| (Buf::new(vec![i as u8], ..), Some(DequeueContext)))
701 .collect::<Vec<_>>()
702 );
703 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
706 &mut bindings_ctx.state;
707 assert_matches!(&core::mem::take(woken_tx_tasks)[..], &[]);
708
709 assert_eq!(
712 core::mem::take(delivered_to_sockets),
713 (0..MAX_TX_QUEUED_LEN)
714 .map(|i| Frame::Sent(fake_sent_ethernet_with_body(vec![i as u8])))
715 .collect::<Vec<_>>()
716 );
717 }
718 }
719
720 #[test]
721 fn dequeue_error() {
722 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
723
724 ctx.transmit_queue_api()
725 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
726
727 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
728 let body = Buf::new(vec![0], ..);
729 assert_eq!(
730 TransmitQueueHandler::queue_tx_frame(
731 core_ctx,
732 bindings_ctx,
733 &FakeLinkDeviceId,
734 (),
735 body.clone(),
736 ),
737 Ok(body.len())
738 );
739 assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
740 assert_eq!(core_ctx.state.transmitted_packets, []);
741
742 core_ctx.state.no_buffers = true;
743 assert_eq!(
744 ctx.transmit_queue_api().transmit_queued_frames(
745 &FakeLinkDeviceId,
746 BatchSize::default(),
747 &mut DequeueContext
748 ),
749 Err(DeviceSendFrameError::NoBuffers),
750 );
751 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
752 assert_eq!(core_ctx.state.transmitted_packets, []);
753 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
754 &bindings_ctx.state;
755 assert_matches!(&woken_tx_tasks[..], &[]);
756 assert_eq!(
759 delivered_to_sockets,
760 &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
761 );
762
763 core_ctx.state.no_buffers = false;
764 assert_eq!(
765 ctx.transmit_queue_api().transmit_queued_frames(
766 &FakeLinkDeviceId,
767 BatchSize::default(),
768 &mut DequeueContext
769 ),
770 Ok(WorkQueueReport::AllDone),
771 );
772 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
773 assert_matches!(&bindings_ctx.state.woken_tx_tasks[..], &[]);
774 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), []);
776 }
777
778 #[test_case(true; "device no buffers")]
779 #[test_case(false; "device has buffers")]
780 fn drain_before_noqueue(no_buffers: bool) {
781 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
782
783 ctx.transmit_queue_api()
784 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
785
786 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
787 let body = Buf::new(vec![0], ..);
788 assert_eq!(
789 TransmitQueueHandler::queue_tx_frame(
790 core_ctx,
791 bindings_ctx,
792 &FakeLinkDeviceId,
793 (),
794 body.clone(),
795 ),
796 Ok(body.len())
797 );
798 assert_eq!(core::mem::take(&mut bindings_ctx.state.woken_tx_tasks), [FakeLinkDeviceId]);
799 assert_eq!(core_ctx.state.transmitted_packets, []);
800
801 core_ctx.state.no_buffers = no_buffers;
802 ctx.transmit_queue_api()
803 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::None);
804
805 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
806 let FakeTxQueueBindingsCtxState { woken_tx_tasks, delivered_to_sockets } =
807 &bindings_ctx.state;
808 assert_matches!(&woken_tx_tasks[..], &[]);
809 assert_eq!(
810 delivered_to_sockets,
811 &[Frame::Sent(fake_sent_ethernet_with_body(body.as_ref().into()))]
812 );
813 if no_buffers {
814 assert_eq!(core_ctx.state.transmitted_packets, []);
815 } else {
816 assert_eq!(core::mem::take(&mut core_ctx.state.transmitted_packets), [(body, None)]);
817 }
818 }
819
820 #[test]
821 fn count() {
822 let mut ctx = CtxPair::with_core_ctx(FakeCoreCtxImpl::default());
823 assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), None);
824
825 ctx.transmit_queue_api()
826 .set_configuration(&FakeLinkDeviceId, TransmitQueueConfiguration::Fifo);
827
828 assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(0));
829
830 let CtxPair { core_ctx, bindings_ctx } = &mut ctx;
831 let body = Buf::new(vec![0], ..);
832 assert_eq!(
833 TransmitQueueHandler::queue_tx_frame(
834 core_ctx,
835 bindings_ctx,
836 &FakeLinkDeviceId,
837 (),
838 body,
839 ),
840 Ok(1)
841 );
842
843 assert_eq!(ctx.transmit_queue_api().count(&FakeLinkDeviceId), Some(1));
844 }
845}