1mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, StreamExt as _, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, SinglePartTxBuffer, Tx};
34
35#[derive(Clone)]
37pub struct Session {
38 inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 let Self { inner } = self;
44 let Inner {
45 name,
46 pool: _,
47 proxy: _,
48 rx: _,
49 tx: _,
50 tx_pending: _,
51 rx_ready: _,
52 tx_ready: _,
53 tx_idle_listeners: _,
54 } = &**inner;
55 f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56 }
57}
58
59impl Session {
60 pub async fn new(
62 device: &netdev::DeviceProxy,
63 name: &str,
64 config: Config,
65 ) -> Result<(Self, Task)> {
66 let inner = Inner::new(device, name, config).await?;
67 Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68 }
69
70 pub fn send(&self, buffer: Buffer<Tx>) {
72 self.inner.send(buffer)
73 }
74
75 pub async fn recv(&self) -> Result<Buffer<Rx>> {
77 self.inner.recv().await
78 }
79
80 pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84 self.inner.pool.alloc_tx_buffer(num_bytes).await
85 }
86
87 pub fn try_alloc_single_part_tx_buffer(
92 &self,
93 num_bytes: usize,
94 ) -> Result<Option<SinglePartTxBuffer>> {
95 self.inner.pool.try_alloc_single_part_tx_buffer(num_bytes)
96 }
97
98 pub async fn alloc_tx_buffers(
111 &self,
112 num_bytes: usize,
113 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
114 self.inner.pool.alloc_tx_buffers(num_bytes).await
115 }
116
117 pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
119 let fut = self.inner.proxy.attach(&port.into(), rx_frames);
126 let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
127 Ok(())
128 }
129
130 pub async fn detach(&self, port: Port) -> Result<()> {
132 let () = self
133 .inner
134 .proxy
135 .detach(&port.into())
136 .await?
137 .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
138 Ok(())
139 }
140
141 pub async fn wait_tx_idle(&self) {
154 self.inner.tx_idle_listeners.wait().await;
155 }
156
157 pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
172 let inner = Arc::clone(&self.inner);
173 let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
174 futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
175 let DelegatedRxLease {
176 hold_until_frame,
177 handle,
178 __source_breaking: fidl::marker::SourceBreaking,
179 } = match inner.proxy.watch_delegated_rx_lease().await {
180 Ok(lease) => lease,
181 Err(e) => {
182 if e.is_closed() {
183 return Ok(None);
184 } else {
185 return Err(Error::Fidl(e));
186 }
187 }
188 };
189 let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
190 let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
191
192 watcher.wait_until(hold_until_frame).await;
193 Ok(Some((handle, (inner, watcher))))
194 })
195 }
196
197 pub async fn close(&self) -> Result<()> {
199 self.inner.proxy.close()?;
200 let mut event_stream = self.inner.proxy.take_event_stream();
202 while let Some(event) = event_stream.next().await {
203 match event {
204 Err(fidl::Error::ClientChannelClosed { .. }) => break,
205 Ok(_) => {} Err(e) => return Err(Error::Fidl(e)),
207 }
208 }
209 Ok(())
210 }
211}
212
213struct Inner {
214 pool: Arc<Pool>,
215 proxy: netdev::SessionProxy,
216 name: String,
217 rx: fasync::Fifo<DescId<Rx>>,
218 tx: fasync::Fifo<DescId<Tx>>,
219 tx_pending: Pending<Tx>,
221 rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
222 tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
223 tx_idle_listeners: TxIdleListeners,
224}
225
226impl Inner {
227 async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
229 let (pool, descriptors, data) = Pool::new(config)?;
230
231 let session_info = {
232 let descriptor_length =
235 u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
236 .expect("descriptor length in 64-bit words not representable by u8");
237 let data = vec![fidl_fuchsia_hardware_network::DataVmo {
238 id: Some(0),
239 vmo: Some(data),
240 num_rx_buffers: Some(config.num_rx_buffers.get()),
241 __source_breaking: fidl::marker::SourceBreaking,
242 }];
243 netdev::SessionInfo {
244 descriptors: Some(descriptors),
245 data: Some(data),
246 descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
247 descriptor_length: Some(descriptor_length),
248 descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
249 options: Some(config.options),
250 ..Default::default()
251 }
252 };
253
254 let (client, netdev::Fifos { rx, tx }) = device
255 .open_session(name, session_info)
256 .await?
257 .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
258 let proxy = client.into_proxy();
259 let rx = fasync::Fifo::from_fifo(rx);
260 let tx = fasync::Fifo::from_fifo(tx);
261
262 Ok(Arc::new(Self {
263 pool,
264 proxy,
265 name: name.to_owned(),
266 rx,
267 tx,
268 tx_pending: Pending::new(Vec::new()),
269 rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
270 tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
271 tx_idle_listeners: TxIdleListeners::new(),
272 }))
273 }
274
275 fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
279 self.pool.rx_pending.poll_submit(&self.rx, cx)
280 }
281
282 fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
286 let mut rx_ready = self.rx_ready.lock();
287 rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
288 }
289
290 fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
294 self.tx_pending.poll_submit(&self.tx, cx)
295 }
296
297 fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
299 let result = {
300 let mut tx_ready = self.tx_ready.lock();
301 tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
305 Ok(desc) => self.pool.tx_completed(desc),
306 Err(status) => Err(Error::Fifo("read", "tx", status)),
307 })
308 };
309
310 match &result {
311 Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
312 Poll::Pending | Poll::Ready(Err(_)) => {}
313 }
314 result
315 }
316
317 fn send(&self, buffer: Buffer<Tx>) {
324 self.tx_idle_listeners.tx_started();
325 self.tx_pending.extend(std::iter::once(buffer.leak()));
326 }
327
328 async fn recv(&self) -> Result<Buffer<Rx>> {
332 poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
333 let head = ready!(self.poll_complete_rx(cx))?;
334 Poll::Ready(self.pool.rx_completed(head))
335 })
336 .await
337 }
338}
339
340#[must_use = "futures do nothing unless you `.await` or poll them"]
344pub struct Task {
345 inner: Arc<Inner>,
346}
347
348impl Future for Task {
349 type Output = Result<()>;
350 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
351 let inner = &Pin::into_inner(self).inner;
352 loop {
353 let mut all_pending = true;
354 while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
357 all_pending = false;
358 }
359 if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
360 all_pending = false;
361 }
362 if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
363 all_pending = false;
364 }
365 if all_pending {
366 return Poll::Pending;
367 }
368 }
369 }
370}
371
372#[derive(Debug, Clone, Copy)]
374pub struct Config {
375 buffer_stride: NonZeroU64,
377 num_rx_buffers: NonZeroU16,
379 num_tx_buffers: NonZeroU16,
381 options: netdev::SessionFlags,
383 buffer_layout: BufferLayout,
385}
386
387#[derive(Debug, Clone, Copy)]
389struct BufferLayout {
390 min_tx_data: usize,
392 min_tx_head: u16,
394 min_tx_tail: u16,
396 length: usize,
398}
399
400#[derive(Debug, Clone, ValidFidlTable)]
402#[fidl_table_src(netdev::DeviceBaseInfo)]
403#[fidl_table_strict]
404pub struct DeviceBaseInfo {
405 pub rx_depth: u16,
407 pub tx_depth: u16,
409 pub buffer_alignment: u32,
411 #[fidl_field_type(optional)]
413 pub max_buffer_length: Option<NonZeroU32>,
414 pub min_rx_buffer_length: u32,
416 pub min_tx_buffer_length: u32,
418 pub min_tx_buffer_head: u16,
420 pub min_tx_buffer_tail: u16,
422 pub max_buffer_parts: u8,
424 #[fidl_field_type(optional)]
427 pub min_rx_buffers: Option<NonZeroU16>,
428}
429
430#[derive(Debug, Clone, ValidFidlTable)]
432#[fidl_table_src(netdev::DeviceInfo)]
433#[fidl_table_strict]
434pub struct DeviceInfo {
435 pub min_descriptor_length: u8,
437 pub descriptor_version: u8,
439 pub base_info: DeviceBaseInfo,
441}
442
443#[derive(Debug, Copy, Clone)]
446pub struct DerivableConfig {
447 pub default_buffer_length: usize,
449 pub watch_rx_leases: bool,
451}
452
453impl DerivableConfig {
454 pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
461 pub const DEFAULT: Self =
463 Self { default_buffer_length: Self::DEFAULT_BUFFER_LENGTH, watch_rx_leases: false };
464}
465
466impl Default for DerivableConfig {
467 fn default() -> Self {
468 Self::DEFAULT
469 }
470}
471
472impl DeviceInfo {
473 pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
478 let DeviceInfo {
479 min_descriptor_length,
480 descriptor_version,
481 base_info:
482 DeviceBaseInfo {
483 rx_depth,
484 tx_depth,
485 buffer_alignment,
486 max_buffer_length,
487 min_rx_buffer_length,
488 min_tx_buffer_length,
489 min_tx_buffer_head,
490 min_tx_buffer_tail,
491 max_buffer_parts: _,
492 min_rx_buffers: _,
493 },
494 } = self;
495 if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
496 return Err(Error::Config(format!(
497 "descriptor version mismatch: {} != {}",
498 NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
499 )));
500 }
501 if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
502 return Err(Error::Config(format!(
503 "descriptor length too small: {} < {}",
504 NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
505 )));
506 }
507
508 let DerivableConfig { default_buffer_length, watch_rx_leases } = config;
509
510 let num_rx_buffers =
511 NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
512 let num_tx_buffers =
513 NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
514
515 let max_buffer_length = max_buffer_length
516 .and_then(|max| {
517 usize::try_from(max.get()).ok_checked::<TryFromIntError>()
521 })
522 .unwrap_or(usize::MAX);
523 let min_buffer_length = usize::try_from(*min_rx_buffer_length)
524 .ok_checked::<TryFromIntError>()
525 .unwrap_or(usize::MAX);
526
527 let buffer_length =
528 usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
529
530 let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
531 |std::num::TryFromIntError { .. }| {
532 Error::Config(format!(
533 "buffer_alignment not representable within usize: {}",
534 buffer_alignment,
535 ))
536 },
537 )?;
538
539 let buffer_stride = buffer_length
540 .checked_add(buffer_alignment - 1)
541 .map(|x| x / buffer_alignment * buffer_alignment)
542 .ok_or_else(|| {
543 Error::Config(format!(
544 "not possible to align {} to {} under usize::MAX",
545 buffer_length, buffer_alignment,
546 ))
547 })?;
548
549 if buffer_stride < buffer_length {
550 return Err(Error::Config(format!(
551 "buffer stride too small {} < {}",
552 buffer_stride, buffer_length
553 )));
554 }
555
556 if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
557 return Err(Error::Config(format!(
558 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
559 buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
560 )));
561 }
562
563 let num_buffers =
564 rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
565 Error::Config(format!(
566 "too many buffers requested: {} + {} > u16::MAX",
567 rx_depth, tx_depth
568 ))
569 })?;
570
571 let buffer_stride =
572 u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
573 Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
574 })?;
575
576 match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
580 None | Some(Err(std::num::TryFromIntError { .. })) => {
581 return Err(Error::Config(format!(
582 "too much memory required for the buffers: {} * {} > isize::MAX",
583 buffer_stride, num_buffers
584 )));
585 }
586 Some(Ok(_total)) => (),
587 };
588
589 let buffer_stride = NonZeroU64::new(buffer_stride)
590 .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
591
592 let min_tx_data = match usize::try_from(*min_tx_buffer_length)
593 .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
594 {
595 Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
596 Ok(None) | Err(std::num::TryFromIntError { .. }) => {
598 return Err(Error::Config(format!(
599 "buffer_length smaller than minimum TX requirement: {} < {}",
600 buffer_length, *min_tx_buffer_length
601 )));
602 }
603 };
604
605 let mut options = netdev::SessionFlags::empty();
606 options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
607
608 Ok(Config {
609 buffer_stride,
610 num_rx_buffers,
611 num_tx_buffers,
612 options,
613 buffer_layout: BufferLayout {
614 length: buffer_length,
615 min_tx_head: *min_tx_buffer_head,
616 min_tx_tail: *min_tx_buffer_tail,
617 min_tx_data,
618 },
619 })
620 }
621}
622
623#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
625pub struct Port {
626 pub(crate) base: u8,
627 pub(crate) salt: u8,
628}
629
630impl TryFrom<netdev::PortId> for Port {
631 type Error = Error;
632 fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
633 if base <= netdev::MAX_PORTS {
634 Ok(Self { base, salt })
635 } else {
636 Err(Error::InvalidPortId(base))
637 }
638 }
639}
640
641impl From<Port> for netdev::PortId {
642 fn from(Port { base, salt }: Port) -> Self {
643 Self { base, salt }
644 }
645}
646
647struct Pending<K: AllocKind> {
649 inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
650}
651
652impl<K: AllocKind> Pending<K> {
653 fn new(descs: Vec<DescId<K>>) -> Self {
654 Self { inner: Mutex::new((descs, None)) }
655 }
656
657 fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
659 let mut guard = self.inner.lock();
660 let (storage, waker) = &mut *guard;
661 storage.extend(descs);
662 if let Some(waker) = waker.take() {
663 waker.wake();
664 }
665 }
666
667 fn poll_submit(
673 &self,
674 fifo: &fasync::Fifo<DescId<K>>,
675 cx: &mut Context<'_>,
676 ) -> Poll<Result<usize>> {
677 let mut guard = self.inner.lock();
678 let (storage, waker) = &mut *guard;
679 if storage.is_empty() {
680 *waker = Some(cx.waker().clone());
681 return Poll::Pending;
682 }
683
684 let submitted = ready!(fifo.try_write(cx, &storage[..]))
689 .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
690 let _drained = storage.drain(0..submitted);
691 Poll::Ready(Ok(submitted))
692 }
693}
694
695struct ReadyBuffer<T> {
702 data: Vec<MaybeUninit<T>>,
708 available: Range<usize>,
709}
710
711impl<T> Drop for ReadyBuffer<T> {
712 fn drop(&mut self) {
713 let Self { data, available } = self;
714 for initialized in &mut data[available.clone()] {
715 unsafe { initialized.assume_init_drop() }
718 }
719 *available = 0..0;
720 }
721}
722
723impl<T> ReadyBuffer<T> {
724 fn new(capacity: usize) -> Self {
725 let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
726 Self { data, available: 0..0 }
727 }
728
729 fn poll_with_fifo(
730 &mut self,
731 cx: &mut Context<'_>,
732 fifo: &fuchsia_async::Fifo<T>,
733 ) -> Poll<std::result::Result<T, zx::Status>>
734 where
735 T: fasync::FifoEntry,
736 {
737 let Self { data, available: Range { start, end } } = self;
738
739 loop {
740 if *start != *end {
742 let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
743 *start += 1;
744 let desc = unsafe { desc.assume_init() };
747 return Poll::Ready(Ok(desc));
748 }
749 let count = ready!(fifo.try_read(cx, &mut data[..]))?;
751 *start = 0;
752 *end = count;
753 }
754 }
755}
756
757struct TxIdleListeners {
758 event: event_listener::Event,
759 tx_in_flight: AtomicUsize,
760}
761
762impl TxIdleListeners {
763 fn new() -> Self {
764 Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
765 }
766
767 fn tx_complete(&self) {
771 let Self { event, tx_in_flight } = self;
772 let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
773 debug_assert_ne!(old_value, 0);
774 if old_value == 1 {
775 let _notified: usize = event.notify(usize::MAX);
776 }
777 }
778
779 fn tx_started(&self) {
781 let Self { event: _, tx_in_flight } = self;
782 let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
783 }
784
785 async fn wait(&self) {
786 let Self { event, tx_in_flight } = self;
787 loop {
796 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
797 return;
798 }
799
800 event_listener::listener!(event => listener);
801
802 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
803 return;
804 }
805
806 listener.await;
807 }
808 }
809}
810
811#[derive(Debug)]
817pub struct RxLease {
818 handle: netdev::DelegatedRxLeaseHandle,
819}
820
821impl Drop for RxLease {
822 fn drop(&mut self) {
823 let Self { handle } = self;
824 match handle {
827 netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
828 }
830 netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
831 }
833 netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
834 }
835 }
836}
837
838impl RxLease {
839 pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
841 &self.handle
842 }
843}
844
845#[cfg(test)]
846mod tests {
847 use std::num::NonZeroU32;
848 use std::ops::Deref;
849 use std::sync::Arc;
850 use std::task::Poll;
851
852 use assert_matches::assert_matches;
853 use fuchsia_async::Fifo;
854 use test_case::test_case;
855 use zerocopy::{FromBytes, Immutable, IntoBytes};
856
857 use crate::session::DerivableConfig;
858
859 use super::buffer::{
860 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
861 };
862 use super::{
863 BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
864 ReadyBuffer, Task, TxIdleListeners,
865 };
866
867 const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
868 rx_depth: 1,
869 tx_depth: 1,
870 buffer_alignment: 1,
871 max_buffer_length: None,
872 min_rx_buffer_length: 0,
873 min_tx_buffer_head: 0,
874 min_tx_buffer_length: 0,
875 min_tx_buffer_tail: 0,
876 max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
877 min_rx_buffers: None,
878 };
879
880 const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
881 min_descriptor_length: 0,
882 descriptor_version: 1,
883 base_info: DEFAULT_DEVICE_BASE_INFO,
884 };
885
886 const DEFAULT_BUFFER_LENGTH: usize = 2048;
887
888 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
889 min_descriptor_length: u8::MAX,
890 ..DEFAULT_DEVICE_INFO
891 }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
892 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
893 descriptor_version: 42,
894 ..DEFAULT_DEVICE_INFO
895 }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
896 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
897 base_info: DeviceBaseInfo {
898 tx_depth: 0,
899 ..DEFAULT_DEVICE_BASE_INFO
900 },
901 ..DEFAULT_DEVICE_INFO
902 }, "no TX buffers")]
903 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
904 base_info: DeviceBaseInfo {
905 rx_depth: 0,
906 ..DEFAULT_DEVICE_BASE_INFO
907 },
908 ..DEFAULT_DEVICE_INFO
909 }, "no RX buffers")]
910 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
911 base_info: DeviceBaseInfo {
912 tx_depth: u16::MAX,
913 rx_depth: u16::MAX,
914 ..DEFAULT_DEVICE_BASE_INFO
915 },
916 ..DEFAULT_DEVICE_INFO
917 }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
918 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
919 base_info: DeviceBaseInfo {
920 min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
921 ..DEFAULT_DEVICE_BASE_INFO
922 },
923 ..DEFAULT_DEVICE_INFO
924 }, format!(
925 "buffer_length smaller than minimum TX requirement: {} < {}",
926 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
927 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
928 base_info: DeviceBaseInfo {
929 min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
930 ..DEFAULT_DEVICE_BASE_INFO
931 },
932 ..DEFAULT_DEVICE_INFO
933 }, format!(
934 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
935 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
936 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
937 base_info: DeviceBaseInfo {
938 min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
939 ..DEFAULT_DEVICE_BASE_INFO
940 },
941 ..DEFAULT_DEVICE_INFO
942 }, format!(
943 "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
944 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
945 #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
946 #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
947 format!(
948 "too much memory required for the buffers: {} * {} > isize::MAX",
949 usize::MAX, 2))]
950 #[test_case(usize::MAX, DeviceInfo {
951 base_info: DeviceBaseInfo {
952 buffer_alignment: 2,
953 ..DEFAULT_DEVICE_BASE_INFO
954 },
955 ..DEFAULT_DEVICE_INFO
956 }, format!(
957 "not possible to align {} to {} under usize::MAX",
958 usize::MAX, 2))]
959 fn configs_from_device_info_err(
960 buffer_length: usize,
961 info: DeviceInfo,
962 expected: impl Deref<Target = str>,
963 ) {
964 let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
965 assert_matches!(
966 info.make_config(config),
967 Err(Error::Config(got)) if got.as_str() == expected.deref()
968 );
969 }
970
971 #[test_case(DeviceInfo {
972 base_info: DeviceBaseInfo {
973 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
974 ..DEFAULT_DEVICE_BASE_INFO
975 },
976 ..DEFAULT_DEVICE_INFO
977 }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
978 #[test_case(DeviceInfo {
979 base_info: DeviceBaseInfo {
980 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
981 ..DEFAULT_DEVICE_BASE_INFO
982 },
983 ..DEFAULT_DEVICE_INFO
984 }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
985 #[test_case(DeviceInfo {
986 base_info: DeviceBaseInfo {
987 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
988 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
989 ..DEFAULT_DEVICE_BASE_INFO
990 },
991 ..DEFAULT_DEVICE_INFO
992 }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
993 fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
994 let config = info
995 .make_config(DerivableConfig {
996 default_buffer_length: DEFAULT_BUFFER_LENGTH,
997 ..Default::default()
998 })
999 .expect("is valid");
1000 let Config {
1001 buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
1002 buffer_stride: _,
1003 num_rx_buffers: _,
1004 num_tx_buffers: _,
1005 options: _,
1006 } = config;
1007 assert_eq!(length, expected_length);
1008 }
1009
1010 fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
1011 let (handle, other_end) = zx::Fifo::create(1).unwrap();
1012 (Fifo::from_fifo(handle), other_end)
1013 }
1014
1015 fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
1016 fifo: Fifo<T>,
1017 rights_to_remove: zx::Rights,
1018 ) -> Fifo<T> {
1019 let fifo = zx::Fifo::from(fifo);
1020 let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
1021
1022 let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1023 Fifo::from_fifo(fifo)
1024 }
1025
1026 enum TxOrRx {
1027 Tx,
1028 Rx,
1029 }
1030 #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1031 #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1032 #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1033 #[fuchsia_async::run_singlethreaded(test)]
1034 async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1035 let (pool, _descriptors, _data) = Pool::new(
1043 DEFAULT_DEVICE_INFO
1044 .make_config(DerivableConfig {
1045 default_buffer_length: DEFAULT_BUFFER_LENGTH,
1046 ..Default::default()
1047 })
1048 .expect("is valid"),
1049 )
1050 .expect("is valid");
1051 let (session_proxy, _session_server) =
1052 fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1053
1054 let (rx, _rx_sender) = make_fifos();
1055 let (tx, _tx_receiver) = make_fifos();
1056
1057 let (tx, rx) = match which_fifo {
1059 TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1060 TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1061 };
1062
1063 let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1064 let inner = Arc::new(Inner {
1065 pool,
1066 proxy: session_proxy,
1067 name: "fake_task".to_string(),
1068 rx,
1069 tx,
1070 tx_pending: Pending::new(vec![]),
1071 rx_ready: Mutex::new(ReadyBuffer::new(10)),
1072 tx_ready: Mutex::new(ReadyBuffer::new(10)),
1073 tx_idle_listeners: TxIdleListeners::new(),
1074 });
1075
1076 inner.send(buf);
1077
1078 let mut task = Task { inner };
1079
1080 assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1083 }
1084}