1mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, StreamExt as _, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, Tx};
34
35#[derive(Clone)]
37pub struct Session {
38 inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 let Self { inner } = self;
44 let Inner {
45 name,
46 pool: _,
47 proxy: _,
48 rx: _,
49 tx: _,
50 tx_pending: _,
51 rx_ready: _,
52 tx_ready: _,
53 tx_idle_listeners: _,
54 } = &**inner;
55 f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56 }
57}
58
59impl Session {
60 pub async fn new(
62 device: &netdev::DeviceProxy,
63 name: &str,
64 config: Config,
65 ) -> Result<(Self, Task)> {
66 let inner = Inner::new(device, name, config).await?;
67 Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68 }
69
70 pub fn send(&self, buffer: Buffer<Tx>) -> Result<()> {
72 self.inner.send(buffer)
73 }
74
75 pub async fn recv(&self) -> Result<Buffer<Rx>> {
77 self.inner.recv().await
78 }
79
80 pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84 self.inner.pool.alloc_tx_buffer(num_bytes).await
85 }
86
87 pub async fn alloc_tx_buffers(
100 &self,
101 num_bytes: usize,
102 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
103 self.inner.pool.alloc_tx_buffers(num_bytes).await
104 }
105
106 pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
108 let fut = self.inner.proxy.attach(&port.into(), rx_frames);
115 let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
116 Ok(())
117 }
118
119 pub async fn detach(&self, port: Port) -> Result<()> {
121 let () = self
122 .inner
123 .proxy
124 .detach(&port.into())
125 .await?
126 .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
127 Ok(())
128 }
129
130 pub async fn wait_tx_idle(&self) {
143 self.inner.tx_idle_listeners.wait().await;
144 }
145
146 pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
161 let inner = Arc::clone(&self.inner);
162 let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
163 futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
164 let DelegatedRxLease {
165 hold_until_frame,
166 handle,
167 __source_breaking: fidl::marker::SourceBreaking,
168 } = match inner.proxy.watch_delegated_rx_lease().await {
169 Ok(lease) => lease,
170 Err(e) => {
171 if e.is_closed() {
172 return Ok(None);
173 } else {
174 return Err(Error::Fidl(e));
175 }
176 }
177 };
178 let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
179 let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
180
181 watcher.wait_until(hold_until_frame).await;
182 Ok(Some((handle, (inner, watcher))))
183 })
184 }
185
186 pub async fn close(&self) -> Result<()> {
188 self.inner.proxy.close()?;
189 let mut event_stream = self.inner.proxy.take_event_stream();
191 while let Some(event) = event_stream.next().await {
192 match event {
193 Err(fidl::Error::ClientChannelClosed { .. }) => break,
194 Ok(_) => {} Err(e) => return Err(Error::Fidl(e)),
196 }
197 }
198 Ok(())
199 }
200}
201
202struct Inner {
203 pool: Arc<Pool>,
204 proxy: netdev::SessionProxy,
205 name: String,
206 rx: fasync::Fifo<DescId<Rx>>,
207 tx: fasync::Fifo<DescId<Tx>>,
208 tx_pending: Pending<Tx>,
210 rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
211 tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
212 tx_idle_listeners: TxIdleListeners,
213}
214
215impl Inner {
216 async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
218 let (pool, descriptors, data) = Pool::new(config)?;
219
220 let session_info = {
221 let descriptor_length =
224 u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
225 .expect("descriptor length in 64-bit words not representable by u8");
226 let data = vec![fidl_fuchsia_hardware_network::DataVmo {
227 id: Some(0),
228 vmo: Some(data),
229 num_rx_buffers: Some(config.num_rx_buffers.get()),
230 __source_breaking: fidl::marker::SourceBreaking,
231 }];
232 netdev::SessionInfo {
233 descriptors: Some(descriptors),
234 data: Some(data),
235 descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
236 descriptor_length: Some(descriptor_length),
237 descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
238 options: Some(config.options),
239 ..Default::default()
240 }
241 };
242
243 let (client, netdev::Fifos { rx, tx }) = device
244 .open_session(name, session_info)
245 .await?
246 .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
247 let proxy = client.into_proxy();
248 let rx = fasync::Fifo::from_fifo(rx);
249 let tx = fasync::Fifo::from_fifo(tx);
250
251 Ok(Arc::new(Self {
252 pool,
253 proxy,
254 name: name.to_owned(),
255 rx,
256 tx,
257 tx_pending: Pending::new(Vec::new()),
258 rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
259 tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
260 tx_idle_listeners: TxIdleListeners::new(),
261 }))
262 }
263
264 fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
268 self.pool.rx_pending.poll_submit(&self.rx, cx)
269 }
270
271 fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
275 let mut rx_ready = self.rx_ready.lock();
276 rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
277 }
278
279 fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
283 self.tx_pending.poll_submit(&self.tx, cx)
284 }
285
286 fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
288 let result = {
289 let mut tx_ready = self.tx_ready.lock();
290 tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
294 Ok(desc) => self.pool.tx_completed(desc),
295 Err(status) => Err(Error::Fifo("read", "tx", status)),
296 })
297 };
298
299 match &result {
300 Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
301 Poll::Pending | Poll::Ready(Err(_)) => {}
302 }
303 result
304 }
305
306 fn send(&self, mut buffer: Buffer<Tx>) -> Result<()> {
308 buffer.pad()?;
309 buffer.commit();
310 self.tx_idle_listeners.tx_started();
311 self.tx_pending.extend(std::iter::once(buffer.leak()));
312 Ok(())
313 }
314
315 async fn recv(&self) -> Result<Buffer<Rx>> {
319 poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
320 let head = ready!(self.poll_complete_rx(cx))?;
321 Poll::Ready(self.pool.rx_completed(head))
322 })
323 .await
324 }
325}
326
327#[must_use = "futures do nothing unless you `.await` or poll them"]
331pub struct Task {
332 inner: Arc<Inner>,
333}
334
335impl Future for Task {
336 type Output = Result<()>;
337 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
338 let inner = &Pin::into_inner(self).inner;
339 loop {
340 let mut all_pending = true;
341 while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
344 all_pending = false;
345 }
346 if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
347 all_pending = false;
348 }
349 if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
350 all_pending = false;
351 }
352 if all_pending {
353 return Poll::Pending;
354 }
355 }
356 }
357}
358
359#[derive(Debug, Clone, Copy)]
361pub struct Config {
362 buffer_stride: NonZeroU64,
364 num_rx_buffers: NonZeroU16,
366 num_tx_buffers: NonZeroU16,
368 options: netdev::SessionFlags,
370 buffer_layout: BufferLayout,
372}
373
374#[derive(Debug, Clone, Copy)]
376struct BufferLayout {
377 min_tx_data: usize,
379 min_tx_head: u16,
381 min_tx_tail: u16,
383 length: usize,
385}
386
387#[derive(Debug, Clone, ValidFidlTable)]
389#[fidl_table_src(netdev::DeviceBaseInfo)]
390#[fidl_table_strict]
391pub struct DeviceBaseInfo {
392 pub rx_depth: u16,
394 pub tx_depth: u16,
396 pub buffer_alignment: u32,
398 #[fidl_field_type(optional)]
400 pub max_buffer_length: Option<NonZeroU32>,
401 pub min_rx_buffer_length: u32,
403 pub min_tx_buffer_length: u32,
405 pub min_tx_buffer_head: u16,
407 pub min_tx_buffer_tail: u16,
409 pub max_buffer_parts: u8,
411 #[fidl_field_type(optional)]
414 pub min_rx_buffers: Option<NonZeroU16>,
415 #[fidl_field_type(default)]
417 pub rx_accel: Vec<netdev::RxAcceleration>,
418 #[fidl_field_type(default)]
420 pub tx_accel: Vec<netdev::TxAcceleration>,
421}
422
423#[derive(Debug, Clone, ValidFidlTable)]
425#[fidl_table_src(netdev::DeviceInfo)]
426#[fidl_table_strict]
427pub struct DeviceInfo {
428 pub min_descriptor_length: u8,
430 pub descriptor_version: u8,
432 pub base_info: DeviceBaseInfo,
434}
435
436#[derive(Debug, Copy, Clone)]
439pub struct DerivableConfig {
440 pub default_buffer_length: usize,
442 pub watch_rx_leases: bool,
444}
445
446impl DerivableConfig {
447 pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
454 pub const DEFAULT: Self =
456 Self { default_buffer_length: Self::DEFAULT_BUFFER_LENGTH, watch_rx_leases: false };
457}
458
459impl Default for DerivableConfig {
460 fn default() -> Self {
461 Self::DEFAULT
462 }
463}
464
465impl DeviceInfo {
466 pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
471 let DeviceInfo {
472 min_descriptor_length,
473 descriptor_version,
474 base_info:
475 DeviceBaseInfo {
476 rx_depth,
477 tx_depth,
478 buffer_alignment,
479 max_buffer_length,
480 min_rx_buffer_length,
481 min_tx_buffer_length,
482 min_tx_buffer_head,
483 min_tx_buffer_tail,
484 max_buffer_parts: _,
485 min_rx_buffers: _,
486 rx_accel: _,
487 tx_accel: _,
488 },
489 } = self;
490 if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
491 return Err(Error::Config(format!(
492 "descriptor version mismatch: {} != {}",
493 NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
494 )));
495 }
496 if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
497 return Err(Error::Config(format!(
498 "descriptor length too small: {} < {}",
499 NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
500 )));
501 }
502
503 let DerivableConfig { default_buffer_length, watch_rx_leases } = config;
504
505 let num_rx_buffers =
506 NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
507 let num_tx_buffers =
508 NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
509
510 let max_buffer_length = max_buffer_length
511 .and_then(|max| {
512 usize::try_from(max.get()).ok_checked::<TryFromIntError>()
516 })
517 .unwrap_or(usize::MAX);
518 let min_buffer_length = usize::try_from(*min_rx_buffer_length)
519 .ok_checked::<TryFromIntError>()
520 .unwrap_or(usize::MAX);
521
522 let buffer_length =
523 usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
524
525 let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
526 |std::num::TryFromIntError { .. }| {
527 Error::Config(format!(
528 "buffer_alignment not representable within usize: {}",
529 buffer_alignment,
530 ))
531 },
532 )?;
533
534 let buffer_stride = buffer_length
535 .checked_add(buffer_alignment - 1)
536 .map(|x| x / buffer_alignment * buffer_alignment)
537 .ok_or_else(|| {
538 Error::Config(format!(
539 "not possible to align {} to {} under usize::MAX",
540 buffer_length, buffer_alignment,
541 ))
542 })?;
543
544 if buffer_stride < buffer_length {
545 return Err(Error::Config(format!(
546 "buffer stride too small {} < {}",
547 buffer_stride, buffer_length
548 )));
549 }
550
551 if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
552 return Err(Error::Config(format!(
553 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
554 buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
555 )));
556 }
557
558 let num_buffers =
559 rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
560 Error::Config(format!(
561 "too many buffers requested: {} + {} > u16::MAX",
562 rx_depth, tx_depth
563 ))
564 })?;
565
566 let buffer_stride =
567 u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
568 Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
569 })?;
570
571 match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
575 None | Some(Err(std::num::TryFromIntError { .. })) => {
576 return Err(Error::Config(format!(
577 "too much memory required for the buffers: {} * {} > isize::MAX",
578 buffer_stride, num_buffers
579 )));
580 }
581 Some(Ok(_total)) => (),
582 };
583
584 let buffer_stride = NonZeroU64::new(buffer_stride)
585 .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
586
587 let min_tx_data = match usize::try_from(*min_tx_buffer_length)
588 .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
589 {
590 Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
591 Ok(None) | Err(std::num::TryFromIntError { .. }) => {
593 return Err(Error::Config(format!(
594 "buffer_length smaller than minimum TX requirement: {} < {}",
595 buffer_length, *min_tx_buffer_length
596 )));
597 }
598 };
599
600 let mut options = netdev::SessionFlags::empty();
601 options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
602
603 Ok(Config {
604 buffer_stride,
605 num_rx_buffers,
606 num_tx_buffers,
607 options,
608 buffer_layout: BufferLayout {
609 length: buffer_length,
610 min_tx_head: *min_tx_buffer_head,
611 min_tx_tail: *min_tx_buffer_tail,
612 min_tx_data,
613 },
614 })
615 }
616}
617
618#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
620pub struct Port {
621 pub(crate) base: u8,
622 pub(crate) salt: u8,
623}
624
625impl TryFrom<netdev::PortId> for Port {
626 type Error = Error;
627 fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
628 if base <= netdev::MAX_PORTS {
629 Ok(Self { base, salt })
630 } else {
631 Err(Error::InvalidPortId(base))
632 }
633 }
634}
635
636impl From<Port> for netdev::PortId {
637 fn from(Port { base, salt }: Port) -> Self {
638 Self { base, salt }
639 }
640}
641
642struct Pending<K: AllocKind> {
644 inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
645}
646
647impl<K: AllocKind> Pending<K> {
648 fn new(descs: Vec<DescId<K>>) -> Self {
649 Self { inner: Mutex::new((descs, None)) }
650 }
651
652 fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
654 let mut guard = self.inner.lock();
655 let (storage, waker) = &mut *guard;
656 storage.extend(descs);
657 if let Some(waker) = waker.take() {
658 waker.wake();
659 }
660 }
661
662 fn poll_submit(
668 &self,
669 fifo: &fasync::Fifo<DescId<K>>,
670 cx: &mut Context<'_>,
671 ) -> Poll<Result<usize>> {
672 let mut guard = self.inner.lock();
673 let (storage, waker) = &mut *guard;
674 if storage.is_empty() {
675 *waker = Some(cx.waker().clone());
676 return Poll::Pending;
677 }
678
679 let submitted = ready!(fifo.try_write(cx, &storage[..]))
684 .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
685 let _drained = storage.drain(0..submitted);
686 Poll::Ready(Ok(submitted))
687 }
688}
689
690struct ReadyBuffer<T> {
697 data: Vec<MaybeUninit<T>>,
703 available: Range<usize>,
704}
705
706impl<T> Drop for ReadyBuffer<T> {
707 fn drop(&mut self) {
708 let Self { data, available } = self;
709 for initialized in &mut data[available.clone()] {
710 unsafe { initialized.assume_init_drop() }
713 }
714 *available = 0..0;
715 }
716}
717
718impl<T> ReadyBuffer<T> {
719 fn new(capacity: usize) -> Self {
720 let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
721 Self { data, available: 0..0 }
722 }
723
724 fn poll_with_fifo(
725 &mut self,
726 cx: &mut Context<'_>,
727 fifo: &fuchsia_async::Fifo<T>,
728 ) -> Poll<std::result::Result<T, zx::Status>>
729 where
730 T: fasync::FifoEntry,
731 {
732 let Self { data, available: Range { start, end } } = self;
733
734 loop {
735 if *start != *end {
737 let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
738 *start += 1;
739 let desc = unsafe { desc.assume_init() };
742 return Poll::Ready(Ok(desc));
743 }
744 let count = ready!(fifo.try_read(cx, &mut data[..]))?;
746 *start = 0;
747 *end = count;
748 }
749 }
750}
751
752struct TxIdleListeners {
753 event: event_listener::Event,
754 tx_in_flight: AtomicUsize,
755}
756
757impl TxIdleListeners {
758 fn new() -> Self {
759 Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
760 }
761
762 fn tx_complete(&self) {
766 let Self { event, tx_in_flight } = self;
767 let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
768 debug_assert_ne!(old_value, 0);
769 if old_value == 1 {
770 let _notified: usize = event.notify(usize::MAX);
771 }
772 }
773
774 fn tx_started(&self) {
776 let Self { event: _, tx_in_flight } = self;
777 let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
778 }
779
780 async fn wait(&self) {
781 let Self { event, tx_in_flight } = self;
782 loop {
791 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
792 return;
793 }
794
795 event_listener::listener!(event => listener);
796
797 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
798 return;
799 }
800
801 listener.await;
802 }
803 }
804}
805
806#[derive(Debug)]
812pub struct RxLease {
813 handle: netdev::DelegatedRxLeaseHandle,
814}
815
816impl Drop for RxLease {
817 fn drop(&mut self) {
818 let Self { handle } = self;
819 match handle {
822 netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
823 }
825 netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
826 }
828 netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
829 }
830 }
831}
832
833impl RxLease {
834 pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
836 &self.handle
837 }
838}
839
840#[cfg(test)]
841mod tests {
842 use std::num::NonZeroU32;
843 use std::ops::Deref;
844 use std::sync::Arc;
845 use std::task::Poll;
846
847 use assert_matches::assert_matches;
848 use fuchsia_async::Fifo;
849 use test_case::test_case;
850 use zerocopy::{FromBytes, Immutable, IntoBytes};
851 use zx::HandleBased as _;
852
853 use crate::session::DerivableConfig;
854
855 use super::buffer::{
856 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
857 };
858 use super::{
859 BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
860 ReadyBuffer, Task, TxIdleListeners,
861 };
862
863 const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
864 rx_depth: 1,
865 tx_depth: 1,
866 buffer_alignment: 1,
867 max_buffer_length: None,
868 min_rx_buffer_length: 0,
869 min_tx_buffer_head: 0,
870 min_tx_buffer_length: 0,
871 min_tx_buffer_tail: 0,
872 max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
873 min_rx_buffers: None,
874 rx_accel: Vec::new(),
875 tx_accel: Vec::new(),
876 };
877
878 const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
879 min_descriptor_length: 0,
880 descriptor_version: 1,
881 base_info: DEFAULT_DEVICE_BASE_INFO,
882 };
883
884 const DEFAULT_BUFFER_LENGTH: usize = 2048;
885
886 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
887 min_descriptor_length: u8::MAX,
888 ..DEFAULT_DEVICE_INFO
889 }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
890 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
891 descriptor_version: 42,
892 ..DEFAULT_DEVICE_INFO
893 }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
894 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
895 base_info: DeviceBaseInfo {
896 tx_depth: 0,
897 ..DEFAULT_DEVICE_BASE_INFO
898 },
899 ..DEFAULT_DEVICE_INFO
900 }, "no TX buffers")]
901 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
902 base_info: DeviceBaseInfo {
903 rx_depth: 0,
904 ..DEFAULT_DEVICE_BASE_INFO
905 },
906 ..DEFAULT_DEVICE_INFO
907 }, "no RX buffers")]
908 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
909 base_info: DeviceBaseInfo {
910 tx_depth: u16::MAX,
911 rx_depth: u16::MAX,
912 ..DEFAULT_DEVICE_BASE_INFO
913 },
914 ..DEFAULT_DEVICE_INFO
915 }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
916 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
917 base_info: DeviceBaseInfo {
918 min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
919 ..DEFAULT_DEVICE_BASE_INFO
920 },
921 ..DEFAULT_DEVICE_INFO
922 }, format!(
923 "buffer_length smaller than minimum TX requirement: {} < {}",
924 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
925 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
926 base_info: DeviceBaseInfo {
927 min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
928 ..DEFAULT_DEVICE_BASE_INFO
929 },
930 ..DEFAULT_DEVICE_INFO
931 }, format!(
932 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
933 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
934 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
935 base_info: DeviceBaseInfo {
936 min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
937 ..DEFAULT_DEVICE_BASE_INFO
938 },
939 ..DEFAULT_DEVICE_INFO
940 }, format!(
941 "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
942 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
943 #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
944 #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
945 format!(
946 "too much memory required for the buffers: {} * {} > isize::MAX",
947 usize::MAX, 2))]
948 #[test_case(usize::MAX, DeviceInfo {
949 base_info: DeviceBaseInfo {
950 buffer_alignment: 2,
951 ..DEFAULT_DEVICE_BASE_INFO
952 },
953 ..DEFAULT_DEVICE_INFO
954 }, format!(
955 "not possible to align {} to {} under usize::MAX",
956 usize::MAX, 2))]
957 fn configs_from_device_info_err(
958 buffer_length: usize,
959 info: DeviceInfo,
960 expected: impl Deref<Target = str>,
961 ) {
962 let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
963 assert_matches!(
964 info.make_config(config),
965 Err(Error::Config(got)) if got.as_str() == expected.deref()
966 );
967 }
968
969 #[test_case(DeviceInfo {
970 base_info: DeviceBaseInfo {
971 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
972 ..DEFAULT_DEVICE_BASE_INFO
973 },
974 ..DEFAULT_DEVICE_INFO
975 }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
976 #[test_case(DeviceInfo {
977 base_info: DeviceBaseInfo {
978 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
979 ..DEFAULT_DEVICE_BASE_INFO
980 },
981 ..DEFAULT_DEVICE_INFO
982 }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
983 #[test_case(DeviceInfo {
984 base_info: DeviceBaseInfo {
985 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
986 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
987 ..DEFAULT_DEVICE_BASE_INFO
988 },
989 ..DEFAULT_DEVICE_INFO
990 }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
991 fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
992 let config = info
993 .make_config(DerivableConfig {
994 default_buffer_length: DEFAULT_BUFFER_LENGTH,
995 ..Default::default()
996 })
997 .expect("is valid");
998 let Config {
999 buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
1000 buffer_stride: _,
1001 num_rx_buffers: _,
1002 num_tx_buffers: _,
1003 options: _,
1004 } = config;
1005 assert_eq!(length, expected_length);
1006 }
1007
1008 fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
1009 let (handle, other_end) = zx::Fifo::create(1).unwrap();
1010 (Fifo::from_fifo(handle), other_end)
1011 }
1012
1013 fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
1014 fifo: Fifo<T>,
1015 rights_to_remove: zx::Rights,
1016 ) -> Fifo<T> {
1017 let fifo = zx::Fifo::from(fifo);
1018 let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
1019
1020 let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1021 Fifo::from_fifo(fifo)
1022 }
1023
1024 enum TxOrRx {
1025 Tx,
1026 Rx,
1027 }
1028 #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1029 #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1030 #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1031 #[fuchsia_async::run_singlethreaded(test)]
1032 async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1033 let (pool, _descriptors, _data) = Pool::new(
1041 DEFAULT_DEVICE_INFO
1042 .make_config(DerivableConfig {
1043 default_buffer_length: DEFAULT_BUFFER_LENGTH,
1044 ..Default::default()
1045 })
1046 .expect("is valid"),
1047 )
1048 .expect("is valid");
1049 let (session_proxy, _session_server) =
1050 fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1051
1052 let (rx, _rx_sender) = make_fifos();
1053 let (tx, _tx_receiver) = make_fifos();
1054
1055 let (tx, rx) = match which_fifo {
1057 TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1058 TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1059 };
1060
1061 let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1062 let inner = Arc::new(Inner {
1063 pool,
1064 proxy: session_proxy,
1065 name: "fake_task".to_string(),
1066 rx,
1067 tx,
1068 tx_pending: Pending::new(vec![]),
1069 rx_ready: Mutex::new(ReadyBuffer::new(10)),
1070 tx_ready: Mutex::new(ReadyBuffer::new(10)),
1071 tx_idle_listeners: TxIdleListeners::new(),
1072 });
1073
1074 inner.send(buf).expect("can send");
1075
1076 let mut task = Task { inner };
1077
1078 assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1081 }
1082}