1mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, StreamExt as _, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, SinglePartTxBuffer, Tx};
34
35#[derive(Clone)]
37pub struct Session {
38 inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 let Self { inner } = self;
44 let Inner {
45 name,
46 pool: _,
47 proxy: _,
48 rx: _,
49 tx: _,
50 tx_pending: _,
51 rx_ready: _,
52 tx_ready: _,
53 tx_idle_listeners: _,
54 } = &**inner;
55 f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56 }
57}
58
59impl Session {
60 pub async fn new(
62 device: &netdev::DeviceProxy,
63 name: &str,
64 config: Config,
65 ) -> Result<(Self, Task)> {
66 let inner = Inner::new(device, name, config).await?;
67 Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68 }
69
70 pub fn send(&self, buffer: Buffer<Tx>) {
72 self.inner.send(buffer)
73 }
74
75 pub async fn recv(&self) -> Result<Buffer<Rx>> {
77 self.inner.recv().await
78 }
79
80 pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84 self.inner.pool.alloc_tx_buffer(num_bytes).await
85 }
86
87 pub fn try_alloc_single_part_tx_buffer(
92 &self,
93 num_bytes: usize,
94 ) -> Result<Option<SinglePartTxBuffer>> {
95 self.inner.pool.try_alloc_single_part_tx_buffer(num_bytes)
96 }
97
98 pub async fn alloc_tx_buffers(
111 &self,
112 num_bytes: usize,
113 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
114 self.inner.pool.alloc_tx_buffers(num_bytes).await
115 }
116
117 pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
119 let fut = self.inner.proxy.attach(&port.into(), rx_frames);
126 let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
127 Ok(())
128 }
129
130 pub async fn detach(&self, port: Port) -> Result<()> {
132 let () = self
133 .inner
134 .proxy
135 .detach(&port.into())
136 .await?
137 .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
138 Ok(())
139 }
140
141 pub async fn wait_tx_idle(&self) {
154 self.inner.tx_idle_listeners.wait().await;
155 }
156
157 pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
172 let inner = Arc::clone(&self.inner);
173 let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
174 futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
175 let DelegatedRxLease {
176 hold_until_frame,
177 handle,
178 __source_breaking: fidl::marker::SourceBreaking,
179 } = match inner.proxy.watch_delegated_rx_lease().await {
180 Ok(lease) => lease,
181 Err(e) => {
182 if e.is_closed() {
183 return Ok(None);
184 } else {
185 return Err(Error::Fidl(e));
186 }
187 }
188 };
189 let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
190 let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
191
192 watcher.wait_until(hold_until_frame).await;
193 Ok(Some((handle, (inner, watcher))))
194 })
195 }
196
197 pub async fn close(&self) -> Result<()> {
199 self.inner.proxy.close()?;
200 let mut event_stream = self.inner.proxy.take_event_stream();
202 while let Some(event) = event_stream.next().await {
203 match event {
204 Err(fidl::Error::ClientChannelClosed { .. }) => break,
205 Ok(_) => {} Err(e) => return Err(Error::Fidl(e)),
207 }
208 }
209 Ok(())
210 }
211}
212
213struct Inner {
214 pool: Arc<Pool>,
215 proxy: netdev::SessionProxy,
216 name: String,
217 rx: fasync::Fifo<DescId<Rx>>,
218 tx: fasync::Fifo<DescId<Tx>>,
219 tx_pending: Pending<Tx>,
221 rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
222 tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
223 tx_idle_listeners: TxIdleListeners,
224}
225
226impl Inner {
227 async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
229 let (pool, descriptors, data) = Pool::new(config)?;
230
231 let session_info = {
232 let descriptor_length =
235 u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
236 .expect("descriptor length in 64-bit words not representable by u8");
237 let data = vec![fidl_fuchsia_hardware_network::DataVmo {
238 id: Some(0),
239 vmo: Some(data),
240 num_rx_buffers: Some(config.num_rx_buffers.get()),
241 __source_breaking: fidl::marker::SourceBreaking,
242 }];
243 netdev::SessionInfo {
244 descriptors: Some(descriptors),
245 data: Some(data),
246 descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
247 descriptor_length: Some(descriptor_length),
248 descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
249 options: Some(config.options),
250 ..Default::default()
251 }
252 };
253
254 let (client, netdev::Fifos { rx, tx }) = device
255 .open_session(name, session_info)
256 .await?
257 .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
258 let proxy = client.into_proxy();
259 let rx = fasync::Fifo::from_fifo(rx);
260 let tx = fasync::Fifo::from_fifo(tx);
261
262 Ok(Arc::new(Self {
263 pool,
264 proxy,
265 name: name.to_owned(),
266 rx,
267 tx,
268 tx_pending: Pending::new(Vec::new()),
269 rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
270 tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
271 tx_idle_listeners: TxIdleListeners::new(),
272 }))
273 }
274
275 fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
279 self.pool.rx_pending.poll_submit(&self.rx, cx)
280 }
281
282 fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
286 let mut rx_ready = self.rx_ready.lock();
287 rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
288 }
289
290 fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
294 self.tx_pending.poll_submit(&self.tx, cx)
295 }
296
297 fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
299 let result = {
300 let mut tx_ready = self.tx_ready.lock();
301 tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
305 Ok(desc) => self.pool.tx_completed(desc),
306 Err(status) => Err(Error::Fifo("read", "tx", status)),
307 })
308 };
309
310 match &result {
311 Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
312 Poll::Pending | Poll::Ready(Err(_)) => {}
313 }
314 result
315 }
316
317 fn send(&self, buffer: Buffer<Tx>) {
324 self.tx_idle_listeners.tx_started();
325 self.tx_pending.extend(std::iter::once(buffer.leak()));
326 }
327
328 async fn recv(&self) -> Result<Buffer<Rx>> {
332 poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
333 let head = ready!(self.poll_complete_rx(cx))?;
334 Poll::Ready(self.pool.rx_completed(head))
335 })
336 .await
337 }
338}
339
340#[must_use = "futures do nothing unless you `.await` or poll them"]
344pub struct Task {
345 inner: Arc<Inner>,
346}
347
348impl Future for Task {
349 type Output = Result<()>;
350 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351 let inner = &Pin::into_inner(self).inner;
352 loop {
353 let mut all_pending = true;
354 while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
357 all_pending = false;
358 }
359 if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
360 all_pending = false;
361 }
362 if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
363 all_pending = false;
364 }
365 if all_pending {
366 return Poll::Pending;
367 }
368 }
369 }
370}
371
372#[derive(Debug, Clone, Copy)]
374pub struct Config {
375 buffer_stride: NonZeroU64,
377 num_rx_buffers: NonZeroU16,
379 num_tx_buffers: NonZeroU16,
381 options: netdev::SessionFlags,
383 buffer_layout: BufferLayout,
385}
386
387#[derive(Debug, Clone, Copy)]
389struct BufferLayout {
390 min_tx_data: usize,
392 min_tx_head: u16,
394 min_tx_tail: u16,
396 length: usize,
398}
399
400#[derive(Debug, Clone, ValidFidlTable)]
402#[fidl_table_src(netdev::DeviceBaseInfo)]
403#[fidl_table_strict]
404pub struct DeviceBaseInfo {
405 pub rx_depth: u16,
407 pub tx_depth: u16,
409 pub buffer_alignment: u32,
411 #[fidl_field_type(optional)]
413 pub max_buffer_length: Option<NonZeroU32>,
414 pub min_rx_buffer_length: u32,
416 pub min_tx_buffer_length: u32,
418 pub min_tx_buffer_head: u16,
420 pub min_tx_buffer_tail: u16,
422 pub max_buffer_parts: u8,
424 #[fidl_field_type(optional)]
427 pub min_rx_buffers: Option<NonZeroU16>,
428 #[fidl_field_type(default)]
430 pub rx_accel: Vec<netdev::RxAcceleration>,
431 #[fidl_field_type(default)]
433 pub tx_accel: Vec<netdev::TxAcceleration>,
434}
435
436#[derive(Debug, Clone, ValidFidlTable)]
438#[fidl_table_src(netdev::DeviceInfo)]
439#[fidl_table_strict]
440pub struct DeviceInfo {
441 pub min_descriptor_length: u8,
443 pub descriptor_version: u8,
445 pub base_info: DeviceBaseInfo,
447}
448
449#[derive(Debug, Copy, Clone)]
452pub struct DerivableConfig {
453 pub default_buffer_length: usize,
455 pub watch_rx_leases: bool,
457}
458
459impl DerivableConfig {
460 pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
467 pub const DEFAULT: Self =
469 Self { default_buffer_length: Self::DEFAULT_BUFFER_LENGTH, watch_rx_leases: false };
470}
471
472impl Default for DerivableConfig {
473 fn default() -> Self {
474 Self::DEFAULT
475 }
476}
477
478impl DeviceInfo {
479 pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
484 let DeviceInfo {
485 min_descriptor_length,
486 descriptor_version,
487 base_info:
488 DeviceBaseInfo {
489 rx_depth,
490 tx_depth,
491 buffer_alignment,
492 max_buffer_length,
493 min_rx_buffer_length,
494 min_tx_buffer_length,
495 min_tx_buffer_head,
496 min_tx_buffer_tail,
497 max_buffer_parts: _,
498 min_rx_buffers: _,
499 rx_accel: _,
500 tx_accel: _,
501 },
502 } = self;
503 if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
504 return Err(Error::Config(format!(
505 "descriptor version mismatch: {} != {}",
506 NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
507 )));
508 }
509 if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
510 return Err(Error::Config(format!(
511 "descriptor length too small: {} < {}",
512 NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
513 )));
514 }
515
516 let DerivableConfig { default_buffer_length, watch_rx_leases } = config;
517
518 let num_rx_buffers =
519 NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
520 let num_tx_buffers =
521 NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
522
523 let max_buffer_length = max_buffer_length
524 .and_then(|max| {
525 usize::try_from(max.get()).ok_checked::<TryFromIntError>()
529 })
530 .unwrap_or(usize::MAX);
531 let min_buffer_length = usize::try_from(*min_rx_buffer_length)
532 .ok_checked::<TryFromIntError>()
533 .unwrap_or(usize::MAX);
534
535 let buffer_length =
536 usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
537
538 let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
539 |std::num::TryFromIntError { .. }| {
540 Error::Config(format!(
541 "buffer_alignment not representable within usize: {}",
542 buffer_alignment,
543 ))
544 },
545 )?;
546
547 let buffer_stride = buffer_length
548 .checked_add(buffer_alignment - 1)
549 .map(|x| x / buffer_alignment * buffer_alignment)
550 .ok_or_else(|| {
551 Error::Config(format!(
552 "not possible to align {} to {} under usize::MAX",
553 buffer_length, buffer_alignment,
554 ))
555 })?;
556
557 if buffer_stride < buffer_length {
558 return Err(Error::Config(format!(
559 "buffer stride too small {} < {}",
560 buffer_stride, buffer_length
561 )));
562 }
563
564 if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
565 return Err(Error::Config(format!(
566 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
567 buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
568 )));
569 }
570
571 let num_buffers =
572 rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
573 Error::Config(format!(
574 "too many buffers requested: {} + {} > u16::MAX",
575 rx_depth, tx_depth
576 ))
577 })?;
578
579 let buffer_stride =
580 u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
581 Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
582 })?;
583
584 match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
588 None | Some(Err(std::num::TryFromIntError { .. })) => {
589 return Err(Error::Config(format!(
590 "too much memory required for the buffers: {} * {} > isize::MAX",
591 buffer_stride, num_buffers
592 )));
593 }
594 Some(Ok(_total)) => (),
595 };
596
597 let buffer_stride = NonZeroU64::new(buffer_stride)
598 .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
599
600 let min_tx_data = match usize::try_from(*min_tx_buffer_length)
601 .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
602 {
603 Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
604 Ok(None) | Err(std::num::TryFromIntError { .. }) => {
606 return Err(Error::Config(format!(
607 "buffer_length smaller than minimum TX requirement: {} < {}",
608 buffer_length, *min_tx_buffer_length
609 )));
610 }
611 };
612
613 let mut options = netdev::SessionFlags::empty();
614 options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
615
616 Ok(Config {
617 buffer_stride,
618 num_rx_buffers,
619 num_tx_buffers,
620 options,
621 buffer_layout: BufferLayout {
622 length: buffer_length,
623 min_tx_head: *min_tx_buffer_head,
624 min_tx_tail: *min_tx_buffer_tail,
625 min_tx_data,
626 },
627 })
628 }
629}
630
631#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
633pub struct Port {
634 pub(crate) base: u8,
635 pub(crate) salt: u8,
636}
637
638impl TryFrom<netdev::PortId> for Port {
639 type Error = Error;
640 fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
641 if base <= netdev::MAX_PORTS {
642 Ok(Self { base, salt })
643 } else {
644 Err(Error::InvalidPortId(base))
645 }
646 }
647}
648
649impl From<Port> for netdev::PortId {
650 fn from(Port { base, salt }: Port) -> Self {
651 Self { base, salt }
652 }
653}
654
655struct Pending<K: AllocKind> {
657 inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
658}
659
660impl<K: AllocKind> Pending<K> {
661 fn new(descs: Vec<DescId<K>>) -> Self {
662 Self { inner: Mutex::new((descs, None)) }
663 }
664
665 fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
667 let mut guard = self.inner.lock();
668 let (storage, waker) = &mut *guard;
669 storage.extend(descs);
670 if let Some(waker) = waker.take() {
671 waker.wake();
672 }
673 }
674
675 fn poll_submit(
681 &self,
682 fifo: &fasync::Fifo<DescId<K>>,
683 cx: &mut Context<'_>,
684 ) -> Poll<Result<usize>> {
685 let mut guard = self.inner.lock();
686 let (storage, waker) = &mut *guard;
687 if storage.is_empty() {
688 *waker = Some(cx.waker().clone());
689 return Poll::Pending;
690 }
691
692 let submitted = ready!(fifo.try_write(cx, &storage[..]))
697 .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
698 let _drained = storage.drain(0..submitted);
699 Poll::Ready(Ok(submitted))
700 }
701}
702
703struct ReadyBuffer<T> {
710 data: Vec<MaybeUninit<T>>,
716 available: Range<usize>,
717}
718
719impl<T> Drop for ReadyBuffer<T> {
720 fn drop(&mut self) {
721 let Self { data, available } = self;
722 for initialized in &mut data[available.clone()] {
723 unsafe { initialized.assume_init_drop() }
726 }
727 *available = 0..0;
728 }
729}
730
731impl<T> ReadyBuffer<T> {
732 fn new(capacity: usize) -> Self {
733 let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
734 Self { data, available: 0..0 }
735 }
736
737 fn poll_with_fifo(
738 &mut self,
739 cx: &mut Context<'_>,
740 fifo: &fuchsia_async::Fifo<T>,
741 ) -> Poll<std::result::Result<T, zx::Status>>
742 where
743 T: fasync::FifoEntry,
744 {
745 let Self { data, available: Range { start, end } } = self;
746
747 loop {
748 if *start != *end {
750 let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
751 *start += 1;
752 let desc = unsafe { desc.assume_init() };
755 return Poll::Ready(Ok(desc));
756 }
757 let count = ready!(fifo.try_read(cx, &mut data[..]))?;
759 *start = 0;
760 *end = count;
761 }
762 }
763}
764
765struct TxIdleListeners {
766 event: event_listener::Event,
767 tx_in_flight: AtomicUsize,
768}
769
770impl TxIdleListeners {
771 fn new() -> Self {
772 Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
773 }
774
775 fn tx_complete(&self) {
779 let Self { event, tx_in_flight } = self;
780 let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
781 debug_assert_ne!(old_value, 0);
782 if old_value == 1 {
783 let _notified: usize = event.notify(usize::MAX);
784 }
785 }
786
787 fn tx_started(&self) {
789 let Self { event: _, tx_in_flight } = self;
790 let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
791 }
792
793 async fn wait(&self) {
794 let Self { event, tx_in_flight } = self;
795 loop {
804 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
805 return;
806 }
807
808 event_listener::listener!(event => listener);
809
810 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
811 return;
812 }
813
814 listener.await;
815 }
816 }
817}
818
819#[derive(Debug)]
825pub struct RxLease {
826 handle: netdev::DelegatedRxLeaseHandle,
827}
828
829impl Drop for RxLease {
830 fn drop(&mut self) {
831 let Self { handle } = self;
832 match handle {
835 netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
836 }
838 netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
839 }
841 netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
842 }
843 }
844}
845
846impl RxLease {
847 pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
849 &self.handle
850 }
851}
852
853#[cfg(test)]
854mod tests {
855 use std::num::NonZeroU32;
856 use std::ops::Deref;
857 use std::sync::Arc;
858 use std::task::Poll;
859
860 use assert_matches::assert_matches;
861 use fuchsia_async::Fifo;
862 use test_case::test_case;
863 use zerocopy::{FromBytes, Immutable, IntoBytes};
864
865 use crate::session::DerivableConfig;
866
867 use super::buffer::{
868 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
869 };
870 use super::{
871 BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
872 ReadyBuffer, Task, TxIdleListeners,
873 };
874
875 const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
876 rx_depth: 1,
877 tx_depth: 1,
878 buffer_alignment: 1,
879 max_buffer_length: None,
880 min_rx_buffer_length: 0,
881 min_tx_buffer_head: 0,
882 min_tx_buffer_length: 0,
883 min_tx_buffer_tail: 0,
884 max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
885 min_rx_buffers: None,
886 rx_accel: Vec::new(),
887 tx_accel: Vec::new(),
888 };
889
890 const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
891 min_descriptor_length: 0,
892 descriptor_version: 1,
893 base_info: DEFAULT_DEVICE_BASE_INFO,
894 };
895
896 const DEFAULT_BUFFER_LENGTH: usize = 2048;
897
898 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
899 min_descriptor_length: u8::MAX,
900 ..DEFAULT_DEVICE_INFO
901 }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
902 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
903 descriptor_version: 42,
904 ..DEFAULT_DEVICE_INFO
905 }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
906 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
907 base_info: DeviceBaseInfo {
908 tx_depth: 0,
909 ..DEFAULT_DEVICE_BASE_INFO
910 },
911 ..DEFAULT_DEVICE_INFO
912 }, "no TX buffers")]
913 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
914 base_info: DeviceBaseInfo {
915 rx_depth: 0,
916 ..DEFAULT_DEVICE_BASE_INFO
917 },
918 ..DEFAULT_DEVICE_INFO
919 }, "no RX buffers")]
920 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
921 base_info: DeviceBaseInfo {
922 tx_depth: u16::MAX,
923 rx_depth: u16::MAX,
924 ..DEFAULT_DEVICE_BASE_INFO
925 },
926 ..DEFAULT_DEVICE_INFO
927 }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
928 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
929 base_info: DeviceBaseInfo {
930 min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
931 ..DEFAULT_DEVICE_BASE_INFO
932 },
933 ..DEFAULT_DEVICE_INFO
934 }, format!(
935 "buffer_length smaller than minimum TX requirement: {} < {}",
936 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
937 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
938 base_info: DeviceBaseInfo {
939 min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
940 ..DEFAULT_DEVICE_BASE_INFO
941 },
942 ..DEFAULT_DEVICE_INFO
943 }, format!(
944 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
945 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
946 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
947 base_info: DeviceBaseInfo {
948 min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
949 ..DEFAULT_DEVICE_BASE_INFO
950 },
951 ..DEFAULT_DEVICE_INFO
952 }, format!(
953 "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
954 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
955 #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
956 #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
957 format!(
958 "too much memory required for the buffers: {} * {} > isize::MAX",
959 usize::MAX, 2))]
960 #[test_case(usize::MAX, DeviceInfo {
961 base_info: DeviceBaseInfo {
962 buffer_alignment: 2,
963 ..DEFAULT_DEVICE_BASE_INFO
964 },
965 ..DEFAULT_DEVICE_INFO
966 }, format!(
967 "not possible to align {} to {} under usize::MAX",
968 usize::MAX, 2))]
969 fn configs_from_device_info_err(
970 buffer_length: usize,
971 info: DeviceInfo,
972 expected: impl Deref<Target = str>,
973 ) {
974 let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
975 assert_matches!(
976 info.make_config(config),
977 Err(Error::Config(got)) if got.as_str() == expected.deref()
978 );
979 }
980
981 #[test_case(DeviceInfo {
982 base_info: DeviceBaseInfo {
983 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
984 ..DEFAULT_DEVICE_BASE_INFO
985 },
986 ..DEFAULT_DEVICE_INFO
987 }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
988 #[test_case(DeviceInfo {
989 base_info: DeviceBaseInfo {
990 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
991 ..DEFAULT_DEVICE_BASE_INFO
992 },
993 ..DEFAULT_DEVICE_INFO
994 }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
995 #[test_case(DeviceInfo {
996 base_info: DeviceBaseInfo {
997 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
998 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
999 ..DEFAULT_DEVICE_BASE_INFO
1000 },
1001 ..DEFAULT_DEVICE_INFO
1002 }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
1003 fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
1004 let config = info
1005 .make_config(DerivableConfig {
1006 default_buffer_length: DEFAULT_BUFFER_LENGTH,
1007 ..Default::default()
1008 })
1009 .expect("is valid");
1010 let Config {
1011 buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
1012 buffer_stride: _,
1013 num_rx_buffers: _,
1014 num_tx_buffers: _,
1015 options: _,
1016 } = config;
1017 assert_eq!(length, expected_length);
1018 }
1019
1020 fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
1021 let (handle, other_end) = zx::Fifo::create(1).unwrap();
1022 (Fifo::from_fifo(handle), other_end)
1023 }
1024
1025 fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
1026 fifo: Fifo<T>,
1027 rights_to_remove: zx::Rights,
1028 ) -> Fifo<T> {
1029 let fifo = zx::Fifo::from(fifo);
1030 let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
1031
1032 let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1033 Fifo::from_fifo(fifo)
1034 }
1035
1036 enum TxOrRx {
1037 Tx,
1038 Rx,
1039 }
1040 #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1041 #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1042 #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1043 #[fuchsia_async::run_singlethreaded(test)]
1044 async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1045 let (pool, _descriptors, _data) = Pool::new(
1053 DEFAULT_DEVICE_INFO
1054 .make_config(DerivableConfig {
1055 default_buffer_length: DEFAULT_BUFFER_LENGTH,
1056 ..Default::default()
1057 })
1058 .expect("is valid"),
1059 )
1060 .expect("is valid");
1061 let (session_proxy, _session_server) =
1062 fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1063
1064 let (rx, _rx_sender) = make_fifos();
1065 let (tx, _tx_receiver) = make_fifos();
1066
1067 let (tx, rx) = match which_fifo {
1069 TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1070 TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1071 };
1072
1073 let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1074 let inner = Arc::new(Inner {
1075 pool,
1076 proxy: session_proxy,
1077 name: "fake_task".to_string(),
1078 rx,
1079 tx,
1080 tx_pending: Pending::new(vec![]),
1081 rx_ready: Mutex::new(ReadyBuffer::new(10)),
1082 tx_ready: Mutex::new(ReadyBuffer::new(10)),
1083 tx_idle_listeners: TxIdleListeners::new(),
1084 });
1085
1086 inner.send(buf);
1087
1088 let mut task = Task { inner };
1089
1090 assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1093 }
1094}