1mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network as netdev;
20use fidl_fuchsia_hardware_network::DelegatedRxLease;
21use fidl_table_validation::ValidFidlTable;
22use fuchsia_async as fasync;
23use fuchsia_sync::Mutex;
24use futures::future::{Future, poll_fn};
25use futures::task::{Context, Poll};
26use futures::{Stream, ready};
27
28use crate::error::{Error, Result};
29use buffer::pool::{Pool, RxLeaseWatcher};
30use buffer::{
31 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
32};
33pub use buffer::{Buffer, Rx, Tx};
34
35#[derive(Clone)]
37pub struct Session {
38 inner: Arc<Inner>,
39}
40
41impl Debug for Session {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 let Self { inner } = self;
44 let Inner {
45 name,
46 pool: _,
47 proxy: _,
48 rx: _,
49 tx: _,
50 tx_pending: _,
51 rx_ready: _,
52 tx_ready: _,
53 tx_idle_listeners: _,
54 } = &**inner;
55 f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
56 }
57}
58
59impl Session {
60 pub async fn new(
62 device: &netdev::DeviceProxy,
63 name: &str,
64 config: Config,
65 ) -> Result<(Self, Task)> {
66 let inner = Inner::new(device, name, config).await?;
67 Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
68 }
69
70 pub fn send(&self, buffer: Buffer<Tx>) -> Result<()> {
72 self.inner.send(buffer)
73 }
74
75 pub async fn recv(&self) -> Result<Buffer<Rx>> {
77 self.inner.recv().await
78 }
79
80 pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
84 self.inner.pool.alloc_tx_buffer(num_bytes).await
85 }
86
87 pub async fn alloc_tx_buffers(
100 &self,
101 num_bytes: usize,
102 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
103 self.inner.pool.alloc_tx_buffers(num_bytes).await
104 }
105
106 pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
108 let fut = self.inner.proxy.attach(&port.into(), rx_frames);
115 let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
116 Ok(())
117 }
118
119 pub async fn detach(&self, port: Port) -> Result<()> {
121 let () = self
122 .inner
123 .proxy
124 .detach(&port.into())
125 .await?
126 .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
127 Ok(())
128 }
129
130 pub async fn wait_tx_idle(&self) {
143 self.inner.tx_idle_listeners.wait().await;
144 }
145
146 pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
161 let inner = Arc::clone(&self.inner);
162 let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
163 futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
164 let DelegatedRxLease {
165 hold_until_frame,
166 handle,
167 __source_breaking: fidl::marker::SourceBreaking,
168 } = match inner.proxy.watch_delegated_rx_lease().await {
169 Ok(lease) => lease,
170 Err(e) => {
171 if e.is_closed() {
172 return Ok(None);
173 } else {
174 return Err(Error::Fidl(e));
175 }
176 }
177 };
178 let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
179 let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
180
181 watcher.wait_until(hold_until_frame).await;
182 Ok(Some((handle, (inner, watcher))))
183 })
184 }
185}
186
187struct Inner {
188 pool: Arc<Pool>,
189 proxy: netdev::SessionProxy,
190 name: String,
191 rx: fasync::Fifo<DescId<Rx>>,
192 tx: fasync::Fifo<DescId<Tx>>,
193 tx_pending: Pending<Tx>,
195 rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
196 tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
197 tx_idle_listeners: TxIdleListeners,
198}
199
200impl Inner {
201 async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
203 let (pool, descriptors, data) = Pool::new(config)?;
204
205 let session_info = {
206 let descriptor_length =
209 u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
210 .expect("descriptor length in 64-bit words not representable by u8");
211 netdev::SessionInfo {
212 descriptors: Some(descriptors),
213 data: Some(data),
214 descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
215 descriptor_length: Some(descriptor_length),
216 descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
217 options: Some(config.options),
218 ..Default::default()
219 }
220 };
221
222 let (client, netdev::Fifos { rx, tx }) = device
223 .open_session(name, session_info)
224 .await?
225 .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
226 let proxy = client.into_proxy();
227 let rx = fasync::Fifo::from_fifo(rx);
228 let tx = fasync::Fifo::from_fifo(tx);
229
230 Ok(Arc::new(Self {
231 pool,
232 proxy,
233 name: name.to_owned(),
234 rx,
235 tx,
236 tx_pending: Pending::new(Vec::new()),
237 rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
238 tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
239 tx_idle_listeners: TxIdleListeners::new(),
240 }))
241 }
242
243 fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
247 self.pool.rx_pending.poll_submit(&self.rx, cx)
248 }
249
250 fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
254 let mut rx_ready = self.rx_ready.lock();
255 rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
256 }
257
258 fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
262 self.tx_pending.poll_submit(&self.tx, cx)
263 }
264
265 fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
267 let result = {
268 let mut tx_ready = self.tx_ready.lock();
269 tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
273 Ok(desc) => self.pool.tx_completed(desc),
274 Err(status) => Err(Error::Fifo("read", "tx", status)),
275 })
276 };
277
278 match &result {
279 Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
280 Poll::Pending | Poll::Ready(Err(_)) => {}
281 }
282 result
283 }
284
285 fn send(&self, mut buffer: Buffer<Tx>) -> Result<()> {
287 buffer.pad()?;
288 buffer.commit();
289 self.tx_idle_listeners.tx_started();
290 self.tx_pending.extend(std::iter::once(buffer.leak()));
291 Ok(())
292 }
293
294 async fn recv(&self) -> Result<Buffer<Rx>> {
298 poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
299 let head = ready!(self.poll_complete_rx(cx))?;
300 Poll::Ready(self.pool.rx_completed(head))
301 })
302 .await
303 }
304}
305
306#[must_use = "futures do nothing unless you `.await` or poll them"]
310pub struct Task {
311 inner: Arc<Inner>,
312}
313
314impl Future for Task {
315 type Output = Result<()>;
316 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
317 let inner = &Pin::into_inner(self).inner;
318 loop {
319 let mut all_pending = true;
320 while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
323 all_pending = false;
324 }
325 if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
326 all_pending = false;
327 }
328 if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
329 all_pending = false;
330 }
331 if all_pending {
332 return Poll::Pending;
333 }
334 }
335 }
336}
337
338#[derive(Debug, Clone, Copy)]
340pub struct Config {
341 buffer_stride: NonZeroU64,
343 num_rx_buffers: NonZeroU16,
345 num_tx_buffers: NonZeroU16,
347 options: netdev::SessionFlags,
349 buffer_layout: BufferLayout,
351}
352
353#[derive(Debug, Clone, Copy)]
355struct BufferLayout {
356 min_tx_data: usize,
358 min_tx_head: u16,
360 min_tx_tail: u16,
362 length: usize,
364}
365
366#[derive(Debug, Clone, ValidFidlTable)]
368#[fidl_table_src(netdev::DeviceBaseInfo)]
369#[fidl_table_strict]
370pub struct DeviceBaseInfo {
371 pub rx_depth: u16,
373 pub tx_depth: u16,
375 pub buffer_alignment: u32,
377 #[fidl_field_type(optional)]
379 pub max_buffer_length: Option<NonZeroU32>,
380 pub min_rx_buffer_length: u32,
382 pub min_tx_buffer_length: u32,
384 pub min_tx_buffer_head: u16,
386 pub min_tx_buffer_tail: u16,
388 pub max_buffer_parts: u8,
390 #[fidl_field_type(default)]
392 pub rx_accel: Vec<netdev::RxAcceleration>,
393 #[fidl_field_type(default)]
395 pub tx_accel: Vec<netdev::TxAcceleration>,
396}
397
398#[derive(Debug, Clone, ValidFidlTable)]
400#[fidl_table_src(netdev::DeviceInfo)]
401#[fidl_table_strict]
402pub struct DeviceInfo {
403 pub min_descriptor_length: u8,
405 pub descriptor_version: u8,
407 pub base_info: DeviceBaseInfo,
409}
410
411#[derive(Debug, Copy, Clone)]
414pub struct DerivableConfig {
415 pub default_buffer_length: usize,
417 pub primary: bool,
419 pub watch_rx_leases: bool,
421}
422
423impl DerivableConfig {
424 pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
431 pub const DEFAULT: Self = Self {
433 default_buffer_length: Self::DEFAULT_BUFFER_LENGTH,
434 primary: true,
435 watch_rx_leases: false,
436 };
437}
438
439impl Default for DerivableConfig {
440 fn default() -> Self {
441 Self::DEFAULT
442 }
443}
444
445impl DeviceInfo {
446 pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
451 let DeviceInfo {
452 min_descriptor_length,
453 descriptor_version,
454 base_info:
455 DeviceBaseInfo {
456 rx_depth,
457 tx_depth,
458 buffer_alignment,
459 max_buffer_length,
460 min_rx_buffer_length,
461 min_tx_buffer_length,
462 min_tx_buffer_head,
463 min_tx_buffer_tail,
464 max_buffer_parts: _,
465 rx_accel: _,
466 tx_accel: _,
467 },
468 } = self;
469 if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
470 return Err(Error::Config(format!(
471 "descriptor version mismatch: {} != {}",
472 NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
473 )));
474 }
475 if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
476 return Err(Error::Config(format!(
477 "descriptor length too small: {} < {}",
478 NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
479 )));
480 }
481
482 let DerivableConfig { default_buffer_length, primary, watch_rx_leases } = config;
483
484 let num_rx_buffers =
485 NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
486 let num_tx_buffers =
487 NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
488
489 let max_buffer_length = max_buffer_length
490 .and_then(|max| {
491 usize::try_from(max.get()).ok_checked::<TryFromIntError>()
495 })
496 .unwrap_or(usize::MAX);
497 let min_buffer_length = usize::try_from(*min_rx_buffer_length)
498 .ok_checked::<TryFromIntError>()
499 .unwrap_or(usize::MAX);
500
501 let buffer_length =
502 usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
503
504 let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
505 |std::num::TryFromIntError { .. }| {
506 Error::Config(format!(
507 "buffer_alignment not representable within usize: {}",
508 buffer_alignment,
509 ))
510 },
511 )?;
512
513 let buffer_stride = buffer_length
514 .checked_add(buffer_alignment - 1)
515 .map(|x| x / buffer_alignment * buffer_alignment)
516 .ok_or_else(|| {
517 Error::Config(format!(
518 "not possible to align {} to {} under usize::MAX",
519 buffer_length, buffer_alignment,
520 ))
521 })?;
522
523 if buffer_stride < buffer_length {
524 return Err(Error::Config(format!(
525 "buffer stride too small {} < {}",
526 buffer_stride, buffer_length
527 )));
528 }
529
530 if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
531 return Err(Error::Config(format!(
532 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
533 buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
534 )));
535 }
536
537 let num_buffers =
538 rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
539 Error::Config(format!(
540 "too many buffers requested: {} + {} > u16::MAX",
541 rx_depth, tx_depth
542 ))
543 })?;
544
545 let buffer_stride =
546 u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
547 Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
548 })?;
549
550 match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
554 None | Some(Err(std::num::TryFromIntError { .. })) => {
555 return Err(Error::Config(format!(
556 "too much memory required for the buffers: {} * {} > isize::MAX",
557 buffer_stride, num_buffers
558 )));
559 }
560 Some(Ok(_total)) => (),
561 };
562
563 let buffer_stride = NonZeroU64::new(buffer_stride)
564 .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
565
566 let min_tx_data = match usize::try_from(*min_tx_buffer_length)
567 .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
568 {
569 Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
570 Ok(None) | Err(std::num::TryFromIntError { .. }) => {
572 return Err(Error::Config(format!(
573 "buffer_length smaller than minimum TX requirement: {} < {}",
574 buffer_length, *min_tx_buffer_length
575 )));
576 }
577 };
578
579 let mut options = netdev::SessionFlags::empty();
580 options.set(netdev::SessionFlags::PRIMARY, primary);
581 options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
582
583 Ok(Config {
584 buffer_stride,
585 num_rx_buffers,
586 num_tx_buffers,
587 options,
588 buffer_layout: BufferLayout {
589 length: buffer_length,
590 min_tx_head: *min_tx_buffer_head,
591 min_tx_tail: *min_tx_buffer_tail,
592 min_tx_data,
593 },
594 })
595 }
596}
597
598#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
600pub struct Port {
601 pub(crate) base: u8,
602 pub(crate) salt: u8,
603}
604
605impl TryFrom<netdev::PortId> for Port {
606 type Error = Error;
607 fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
608 if base <= netdev::MAX_PORTS {
609 Ok(Self { base, salt })
610 } else {
611 Err(Error::InvalidPortId(base))
612 }
613 }
614}
615
616impl From<Port> for netdev::PortId {
617 fn from(Port { base, salt }: Port) -> Self {
618 Self { base, salt }
619 }
620}
621
622struct Pending<K: AllocKind> {
624 inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
625}
626
627impl<K: AllocKind> Pending<K> {
628 fn new(descs: Vec<DescId<K>>) -> Self {
629 Self { inner: Mutex::new((descs, None)) }
630 }
631
632 fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
634 let mut guard = self.inner.lock();
635 let (storage, waker) = &mut *guard;
636 storage.extend(descs);
637 if let Some(waker) = waker.take() {
638 waker.wake();
639 }
640 }
641
642 fn poll_submit(
648 &self,
649 fifo: &fasync::Fifo<DescId<K>>,
650 cx: &mut Context<'_>,
651 ) -> Poll<Result<usize>> {
652 let mut guard = self.inner.lock();
653 let (storage, waker) = &mut *guard;
654 if storage.is_empty() {
655 *waker = Some(cx.waker().clone());
656 return Poll::Pending;
657 }
658
659 let submitted = ready!(fifo.try_write(cx, &storage[..]))
664 .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
665 let _drained = storage.drain(0..submitted);
666 Poll::Ready(Ok(submitted))
667 }
668}
669
670struct ReadyBuffer<T> {
677 data: Vec<MaybeUninit<T>>,
683 available: Range<usize>,
684}
685
686impl<T> Drop for ReadyBuffer<T> {
687 fn drop(&mut self) {
688 let Self { data, available } = self;
689 for initialized in &mut data[available.clone()] {
690 unsafe { initialized.assume_init_drop() }
693 }
694 *available = 0..0;
695 }
696}
697
698impl<T> ReadyBuffer<T> {
699 fn new(capacity: usize) -> Self {
700 let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
701 Self { data, available: 0..0 }
702 }
703
704 fn poll_with_fifo(
705 &mut self,
706 cx: &mut Context<'_>,
707 fifo: &fuchsia_async::Fifo<T>,
708 ) -> Poll<std::result::Result<T, zx::Status>>
709 where
710 T: fasync::FifoEntry,
711 {
712 let Self { data, available: Range { start, end } } = self;
713
714 loop {
715 if *start != *end {
717 let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
718 *start += 1;
719 let desc = unsafe { desc.assume_init() };
722 return Poll::Ready(Ok(desc));
723 }
724 let count = ready!(fifo.try_read(cx, &mut data[..]))?;
726 *start = 0;
727 *end = count;
728 }
729 }
730}
731
732struct TxIdleListeners {
733 event: event_listener::Event,
734 tx_in_flight: AtomicUsize,
735}
736
737impl TxIdleListeners {
738 fn new() -> Self {
739 Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
740 }
741
742 fn tx_complete(&self) {
746 let Self { event, tx_in_flight } = self;
747 let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
748 debug_assert_ne!(old_value, 0);
749 if old_value == 1 {
750 let _notified: usize = event.notify(usize::MAX);
751 }
752 }
753
754 fn tx_started(&self) {
756 let Self { event: _, tx_in_flight } = self;
757 let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
758 }
759
760 async fn wait(&self) {
761 let Self { event, tx_in_flight } = self;
762 loop {
771 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
772 return;
773 }
774
775 event_listener::listener!(event => listener);
776
777 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
778 return;
779 }
780
781 listener.await;
782 }
783 }
784}
785
786#[derive(Debug)]
792pub struct RxLease {
793 handle: netdev::DelegatedRxLeaseHandle,
794}
795
796impl Drop for RxLease {
797 fn drop(&mut self) {
798 let Self { handle } = self;
799 match handle {
802 netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
803 }
805 netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
806 }
808 netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
809 }
810 }
811}
812
813impl RxLease {
814 pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
816 &self.handle
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use std::num::NonZeroU32;
823 use std::ops::Deref;
824 use std::sync::Arc;
825 use std::task::Poll;
826
827 use assert_matches::assert_matches;
828 use fuchsia_async::Fifo;
829 use test_case::test_case;
830 use zerocopy::{FromBytes, Immutable, IntoBytes};
831 use zx::HandleBased as _;
832
833 use crate::session::DerivableConfig;
834
835 use super::buffer::{
836 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
837 };
838 use super::{
839 BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
840 ReadyBuffer, Task, TxIdleListeners,
841 };
842
843 const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
844 rx_depth: 1,
845 tx_depth: 1,
846 buffer_alignment: 1,
847 max_buffer_length: None,
848 min_rx_buffer_length: 0,
849 min_tx_buffer_head: 0,
850 min_tx_buffer_length: 0,
851 min_tx_buffer_tail: 0,
852 max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
853 rx_accel: Vec::new(),
854 tx_accel: Vec::new(),
855 };
856
857 const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
858 min_descriptor_length: 0,
859 descriptor_version: 1,
860 base_info: DEFAULT_DEVICE_BASE_INFO,
861 };
862
863 const DEFAULT_BUFFER_LENGTH: usize = 2048;
864
865 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
866 min_descriptor_length: u8::MAX,
867 ..DEFAULT_DEVICE_INFO
868 }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
869 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
870 descriptor_version: 42,
871 ..DEFAULT_DEVICE_INFO
872 }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
873 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
874 base_info: DeviceBaseInfo {
875 tx_depth: 0,
876 ..DEFAULT_DEVICE_BASE_INFO
877 },
878 ..DEFAULT_DEVICE_INFO
879 }, "no TX buffers")]
880 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
881 base_info: DeviceBaseInfo {
882 rx_depth: 0,
883 ..DEFAULT_DEVICE_BASE_INFO
884 },
885 ..DEFAULT_DEVICE_INFO
886 }, "no RX buffers")]
887 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
888 base_info: DeviceBaseInfo {
889 tx_depth: u16::MAX,
890 rx_depth: u16::MAX,
891 ..DEFAULT_DEVICE_BASE_INFO
892 },
893 ..DEFAULT_DEVICE_INFO
894 }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
895 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
896 base_info: DeviceBaseInfo {
897 min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
898 ..DEFAULT_DEVICE_BASE_INFO
899 },
900 ..DEFAULT_DEVICE_INFO
901 }, format!(
902 "buffer_length smaller than minimum TX requirement: {} < {}",
903 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
904 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
905 base_info: DeviceBaseInfo {
906 min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
907 ..DEFAULT_DEVICE_BASE_INFO
908 },
909 ..DEFAULT_DEVICE_INFO
910 }, format!(
911 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
912 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
913 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
914 base_info: DeviceBaseInfo {
915 min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
916 ..DEFAULT_DEVICE_BASE_INFO
917 },
918 ..DEFAULT_DEVICE_INFO
919 }, format!(
920 "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
921 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
922 #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
923 #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
924 format!(
925 "too much memory required for the buffers: {} * {} > isize::MAX",
926 usize::MAX, 2))]
927 #[test_case(usize::MAX, DeviceInfo {
928 base_info: DeviceBaseInfo {
929 buffer_alignment: 2,
930 ..DEFAULT_DEVICE_BASE_INFO
931 },
932 ..DEFAULT_DEVICE_INFO
933 }, format!(
934 "not possible to align {} to {} under usize::MAX",
935 usize::MAX, 2))]
936 fn configs_from_device_info_err(
937 buffer_length: usize,
938 info: DeviceInfo,
939 expected: impl Deref<Target = str>,
940 ) {
941 let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
942 assert_matches!(
943 info.make_config(config),
944 Err(Error::Config(got)) if got.as_str() == expected.deref()
945 );
946 }
947
948 #[test_case(DeviceInfo {
949 base_info: DeviceBaseInfo {
950 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
951 ..DEFAULT_DEVICE_BASE_INFO
952 },
953 ..DEFAULT_DEVICE_INFO
954 }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
955 #[test_case(DeviceInfo {
956 base_info: DeviceBaseInfo {
957 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
958 ..DEFAULT_DEVICE_BASE_INFO
959 },
960 ..DEFAULT_DEVICE_INFO
961 }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
962 #[test_case(DeviceInfo {
963 base_info: DeviceBaseInfo {
964 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
965 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
966 ..DEFAULT_DEVICE_BASE_INFO
967 },
968 ..DEFAULT_DEVICE_INFO
969 }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
970 fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
971 let config = info
972 .make_config(DerivableConfig {
973 default_buffer_length: DEFAULT_BUFFER_LENGTH,
974 ..Default::default()
975 })
976 .expect("is valid");
977 let Config {
978 buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
979 buffer_stride: _,
980 num_rx_buffers: _,
981 num_tx_buffers: _,
982 options: _,
983 } = config;
984 assert_eq!(length, expected_length);
985 }
986
987 fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
988 let (handle, other_end) = zx::Fifo::create(1).unwrap();
989 (Fifo::from_fifo(handle), other_end)
990 }
991
992 fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
993 fifo: Fifo<T>,
994 rights_to_remove: zx::Rights,
995 ) -> Fifo<T> {
996 let fifo = zx::Fifo::from(fifo);
997 let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
998
999 let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
1000 Fifo::from_fifo(fifo)
1001 }
1002
1003 enum TxOrRx {
1004 Tx,
1005 Rx,
1006 }
1007 #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1008 #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1009 #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1010 #[fuchsia_async::run_singlethreaded(test)]
1011 async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1012 let (pool, _descriptors, _data) = Pool::new(
1020 DEFAULT_DEVICE_INFO
1021 .make_config(DerivableConfig {
1022 default_buffer_length: DEFAULT_BUFFER_LENGTH,
1023 ..Default::default()
1024 })
1025 .expect("is valid"),
1026 )
1027 .expect("is valid");
1028 let (session_proxy, _session_server) =
1029 fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1030
1031 let (rx, _rx_sender) = make_fifos();
1032 let (tx, _tx_receiver) = make_fifos();
1033
1034 let (tx, rx) = match which_fifo {
1036 TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1037 TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1038 };
1039
1040 let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1041 let inner = Arc::new(Inner {
1042 pool,
1043 proxy: session_proxy,
1044 name: "fake_task".to_string(),
1045 rx,
1046 tx,
1047 tx_pending: Pending::new(vec![]),
1048 rx_ready: Mutex::new(ReadyBuffer::new(10)),
1049 tx_ready: Mutex::new(ReadyBuffer::new(10)),
1050 tx_idle_listeners: TxIdleListeners::new(),
1051 });
1052
1053 inner.send(buf).expect("can send");
1054
1055 let mut task = Task { inner };
1056
1057 assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1060 }
1061}