1mod buffer;
8
9use std::fmt::Debug;
10use std::mem::MaybeUninit;
11use std::num::{NonZeroU16, NonZeroU32, NonZeroU64, TryFromIntError};
12use std::ops::Range;
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{self, AtomicUsize};
16use std::task::Waker;
17
18use explicit::{PollExt as _, ResultExt as _};
19use fidl_fuchsia_hardware_network::DelegatedRxLease;
20use fidl_table_validation::ValidFidlTable;
21use fuchsia_sync::Mutex;
22use futures::future::{Future, poll_fn};
23use futures::task::{Context, Poll};
24use futures::{Stream, ready};
25use {fidl_fuchsia_hardware_network as netdev, fuchsia_async as fasync};
26
27use crate::error::{Error, Result};
28use buffer::pool::{Pool, RxLeaseWatcher};
29use buffer::{
30 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
31};
32pub use buffer::{Buffer, Rx, Tx};
33
34#[derive(Clone)]
36pub struct Session {
37 inner: Arc<Inner>,
38}
39
40impl Debug for Session {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 let Self { inner } = self;
43 let Inner {
44 name,
45 pool: _,
46 proxy: _,
47 rx: _,
48 tx: _,
49 tx_pending: _,
50 rx_ready: _,
51 tx_ready: _,
52 tx_idle_listeners: _,
53 } = &**inner;
54 f.debug_struct("Session").field("name", &name).finish_non_exhaustive()
55 }
56}
57
58impl Session {
59 pub async fn new(
61 device: &netdev::DeviceProxy,
62 name: &str,
63 config: Config,
64 ) -> Result<(Self, Task)> {
65 let inner = Inner::new(device, name, config).await?;
66 Ok((Session { inner: Arc::clone(&inner) }, Task { inner }))
67 }
68
69 pub fn send(&self, buffer: Buffer<Tx>) -> Result<()> {
71 self.inner.send(buffer)
72 }
73
74 pub async fn recv(&self) -> Result<Buffer<Rx>> {
76 self.inner.recv().await
77 }
78
79 pub async fn alloc_tx_buffer(&self, num_bytes: usize) -> Result<Buffer<Tx>> {
83 self.inner.pool.alloc_tx_buffer(num_bytes).await
84 }
85
86 pub async fn alloc_tx_buffers(
99 &self,
100 num_bytes: usize,
101 ) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + '_> {
102 self.inner.pool.alloc_tx_buffers(num_bytes).await
103 }
104
105 pub async fn attach(&self, port: Port, rx_frames: &[netdev::FrameType]) -> Result<()> {
107 let fut = self.inner.proxy.attach(&port.into(), rx_frames);
114 let () = fut.await?.map_err(|raw| Error::Attach(port, zx::Status::from_raw(raw)))?;
115 Ok(())
116 }
117
118 pub async fn detach(&self, port: Port) -> Result<()> {
120 let () = self
121 .inner
122 .proxy
123 .detach(&port.into())
124 .await?
125 .map_err(|raw| Error::Detach(port, zx::Status::from_raw(raw)))?;
126 Ok(())
127 }
128
129 pub async fn wait_tx_idle(&self) {
142 self.inner.tx_idle_listeners.wait().await;
143 }
144
145 pub fn watch_rx_leases(&self) -> impl Stream<Item = Result<RxLease>> + Send + Sync + use<> {
160 let inner = Arc::clone(&self.inner);
161 let watcher = RxLeaseWatcher::new(Arc::clone(&inner.pool));
162 futures::stream::try_unfold((inner, watcher), |(inner, mut watcher)| async move {
163 let DelegatedRxLease {
164 hold_until_frame,
165 handle,
166 __source_breaking: fidl::marker::SourceBreaking,
167 } = match inner.proxy.watch_delegated_rx_lease().await {
168 Ok(lease) => lease,
169 Err(e) => {
170 if e.is_closed() {
171 return Ok(None);
172 } else {
173 return Err(Error::Fidl(e));
174 }
175 }
176 };
177 let hold_until_frame = hold_until_frame.ok_or(Error::InvalidLease)?;
178 let handle = RxLease { handle: handle.ok_or(Error::InvalidLease)? };
179
180 watcher.wait_until(hold_until_frame).await;
181 Ok(Some((handle, (inner, watcher))))
182 })
183 }
184}
185
186struct Inner {
187 pool: Arc<Pool>,
188 proxy: netdev::SessionProxy,
189 name: String,
190 rx: fasync::Fifo<DescId<Rx>>,
191 tx: fasync::Fifo<DescId<Tx>>,
192 tx_pending: Pending<Tx>,
194 rx_ready: Mutex<ReadyBuffer<DescId<Rx>>>,
195 tx_ready: Mutex<ReadyBuffer<DescId<Tx>>>,
196 tx_idle_listeners: TxIdleListeners,
197}
198
199impl Inner {
200 async fn new(device: &netdev::DeviceProxy, name: &str, config: Config) -> Result<Arc<Self>> {
202 let (pool, descriptors, data) = Pool::new(config)?;
203
204 let session_info = {
205 let descriptor_length =
208 u8::try_from(NETWORK_DEVICE_DESCRIPTOR_LENGTH / std::mem::size_of::<u64>())
209 .expect("descriptor length in 64-bit words not representable by u8");
210 netdev::SessionInfo {
211 descriptors: Some(descriptors),
212 data: Some(data),
213 descriptor_version: Some(NETWORK_DEVICE_DESCRIPTOR_VERSION),
214 descriptor_length: Some(descriptor_length),
215 descriptor_count: Some(config.num_tx_buffers.get() + config.num_rx_buffers.get()),
216 options: Some(config.options),
217 ..Default::default()
218 }
219 };
220
221 let (client, netdev::Fifos { rx, tx }) = device
222 .open_session(name, session_info)
223 .await?
224 .map_err(|raw| Error::Open(name.to_owned(), zx::Status::from_raw(raw)))?;
225 let proxy = client.into_proxy();
226 let rx = fasync::Fifo::from_fifo(rx);
227 let tx = fasync::Fifo::from_fifo(tx);
228
229 Ok(Arc::new(Self {
230 pool,
231 proxy,
232 name: name.to_owned(),
233 rx,
234 tx,
235 tx_pending: Pending::new(Vec::new()),
236 rx_ready: Mutex::new(ReadyBuffer::new(config.num_rx_buffers.get().into())),
237 tx_ready: Mutex::new(ReadyBuffer::new(config.num_tx_buffers.get().into())),
238 tx_idle_listeners: TxIdleListeners::new(),
239 }))
240 }
241
242 fn poll_submit_rx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
246 self.pool.rx_pending.poll_submit(&self.rx, cx)
247 }
248
249 fn poll_complete_rx(&self, cx: &mut Context<'_>) -> Poll<Result<DescId<Rx>>> {
253 let mut rx_ready = self.rx_ready.lock();
254 rx_ready.poll_with_fifo(cx, &self.rx).map_err(|status| Error::Fifo("read", "rx", status))
255 }
256
257 fn poll_submit_tx(&self, cx: &mut Context<'_>) -> Poll<Result<usize>> {
261 self.tx_pending.poll_submit(&self.tx, cx)
262 }
263
264 fn poll_complete_tx(&self, cx: &mut Context<'_>) -> Poll<Result<()>> {
266 let result = {
267 let mut tx_ready = self.tx_ready.lock();
268 tx_ready.poll_with_fifo(cx, &self.tx).map(|r| match r {
272 Ok(desc) => self.pool.tx_completed(desc),
273 Err(status) => Err(Error::Fifo("read", "tx", status)),
274 })
275 };
276
277 match &result {
278 Poll::Ready(Ok(())) => self.tx_idle_listeners.tx_complete(),
279 Poll::Pending | Poll::Ready(Err(_)) => {}
280 }
281 result
282 }
283
284 fn send(&self, mut buffer: Buffer<Tx>) -> Result<()> {
286 buffer.pad()?;
287 buffer.commit();
288 self.tx_idle_listeners.tx_started();
289 self.tx_pending.extend(std::iter::once(buffer.leak()));
290 Ok(())
291 }
292
293 async fn recv(&self) -> Result<Buffer<Rx>> {
297 poll_fn(|cx| -> Poll<Result<Buffer<Rx>>> {
298 let head = ready!(self.poll_complete_rx(cx))?;
299 Poll::Ready(self.pool.rx_completed(head))
300 })
301 .await
302 }
303}
304
305#[must_use = "futures do nothing unless you `.await` or poll them"]
309pub struct Task {
310 inner: Arc<Inner>,
311}
312
313impl Future for Task {
314 type Output = Result<()>;
315 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
316 let inner = &Pin::into_inner(self).inner;
317 loop {
318 let mut all_pending = true;
319 while inner.poll_complete_tx(cx)?.is_ready_checked::<()>() {
322 all_pending = false;
323 }
324 if inner.poll_submit_rx(cx)?.is_ready_checked::<usize>() {
325 all_pending = false;
326 }
327 if inner.poll_submit_tx(cx)?.is_ready_checked::<usize>() {
328 all_pending = false;
329 }
330 if all_pending {
331 return Poll::Pending;
332 }
333 }
334 }
335}
336
337#[derive(Debug, Clone, Copy)]
339pub struct Config {
340 buffer_stride: NonZeroU64,
342 num_rx_buffers: NonZeroU16,
344 num_tx_buffers: NonZeroU16,
346 options: netdev::SessionFlags,
348 buffer_layout: BufferLayout,
350}
351
352#[derive(Debug, Clone, Copy)]
354struct BufferLayout {
355 min_tx_data: usize,
357 min_tx_head: u16,
359 min_tx_tail: u16,
361 length: usize,
363}
364
365#[derive(Debug, Clone, ValidFidlTable)]
367#[fidl_table_src(netdev::DeviceBaseInfo)]
368#[fidl_table_strict]
369pub struct DeviceBaseInfo {
370 pub rx_depth: u16,
372 pub tx_depth: u16,
374 pub buffer_alignment: u32,
376 #[fidl_field_type(optional)]
378 pub max_buffer_length: Option<NonZeroU32>,
379 pub min_rx_buffer_length: u32,
381 pub min_tx_buffer_length: u32,
383 pub min_tx_buffer_head: u16,
385 pub min_tx_buffer_tail: u16,
387 pub max_buffer_parts: u8,
389 #[fidl_field_type(default)]
391 pub rx_accel: Vec<netdev::RxAcceleration>,
392 #[fidl_field_type(default)]
394 pub tx_accel: Vec<netdev::TxAcceleration>,
395}
396
397#[derive(Debug, Clone, ValidFidlTable)]
399#[fidl_table_src(netdev::DeviceInfo)]
400#[fidl_table_strict]
401pub struct DeviceInfo {
402 pub min_descriptor_length: u8,
404 pub descriptor_version: u8,
406 pub base_info: DeviceBaseInfo,
408}
409
410#[derive(Debug, Copy, Clone)]
413pub struct DerivableConfig {
414 pub default_buffer_length: usize,
416 pub primary: bool,
418 pub watch_rx_leases: bool,
420}
421
422impl DerivableConfig {
423 pub const DEFAULT_BUFFER_LENGTH: usize = 2048;
430 pub const DEFAULT: Self = Self {
432 default_buffer_length: Self::DEFAULT_BUFFER_LENGTH,
433 primary: true,
434 watch_rx_leases: false,
435 };
436}
437
438impl Default for DerivableConfig {
439 fn default() -> Self {
440 Self::DEFAULT
441 }
442}
443
444impl DeviceInfo {
445 pub fn make_config(&self, config: DerivableConfig) -> Result<Config> {
450 let DeviceInfo {
451 min_descriptor_length,
452 descriptor_version,
453 base_info:
454 DeviceBaseInfo {
455 rx_depth,
456 tx_depth,
457 buffer_alignment,
458 max_buffer_length,
459 min_rx_buffer_length,
460 min_tx_buffer_length,
461 min_tx_buffer_head,
462 min_tx_buffer_tail,
463 max_buffer_parts: _,
464 rx_accel: _,
465 tx_accel: _,
466 },
467 } = self;
468 if NETWORK_DEVICE_DESCRIPTOR_VERSION != *descriptor_version {
469 return Err(Error::Config(format!(
470 "descriptor version mismatch: {} != {}",
471 NETWORK_DEVICE_DESCRIPTOR_VERSION, descriptor_version
472 )));
473 }
474 if NETWORK_DEVICE_DESCRIPTOR_LENGTH < usize::from(*min_descriptor_length) {
475 return Err(Error::Config(format!(
476 "descriptor length too small: {} < {}",
477 NETWORK_DEVICE_DESCRIPTOR_LENGTH, min_descriptor_length
478 )));
479 }
480
481 let DerivableConfig { default_buffer_length, primary, watch_rx_leases } = config;
482
483 let num_rx_buffers =
484 NonZeroU16::new(*rx_depth).ok_or_else(|| Error::Config("no RX buffers".to_owned()))?;
485 let num_tx_buffers =
486 NonZeroU16::new(*tx_depth).ok_or_else(|| Error::Config("no TX buffers".to_owned()))?;
487
488 let max_buffer_length = max_buffer_length
489 .and_then(|max| {
490 usize::try_from(max.get()).ok_checked::<TryFromIntError>()
494 })
495 .unwrap_or(usize::MAX);
496 let min_buffer_length = usize::try_from(*min_rx_buffer_length)
497 .ok_checked::<TryFromIntError>()
498 .unwrap_or(usize::MAX);
499
500 let buffer_length =
501 usize::min(max_buffer_length, usize::max(min_buffer_length, default_buffer_length));
502
503 let buffer_alignment = usize::try_from(*buffer_alignment).map_err(
504 |std::num::TryFromIntError { .. }| {
505 Error::Config(format!(
506 "buffer_alignment not representable within usize: {}",
507 buffer_alignment,
508 ))
509 },
510 )?;
511
512 let buffer_stride = buffer_length
513 .checked_add(buffer_alignment - 1)
514 .map(|x| x / buffer_alignment * buffer_alignment)
515 .ok_or_else(|| {
516 Error::Config(format!(
517 "not possible to align {} to {} under usize::MAX",
518 buffer_length, buffer_alignment,
519 ))
520 })?;
521
522 if buffer_stride < buffer_length {
523 return Err(Error::Config(format!(
524 "buffer stride too small {} < {}",
525 buffer_stride, buffer_length
526 )));
527 }
528
529 if buffer_length < usize::from(*min_tx_buffer_head) + usize::from(*min_tx_buffer_tail) {
530 return Err(Error::Config(format!(
531 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/{}",
532 buffer_length, min_tx_buffer_head, min_tx_buffer_tail,
533 )));
534 }
535
536 let num_buffers =
537 rx_depth.checked_add(*tx_depth).filter(|num| *num != u16::MAX).ok_or_else(|| {
538 Error::Config(format!(
539 "too many buffers requested: {} + {} > u16::MAX",
540 rx_depth, tx_depth
541 ))
542 })?;
543
544 let buffer_stride =
545 u64::try_from(buffer_stride).map_err(|std::num::TryFromIntError { .. }| {
546 Error::Config(format!("buffer_stride too big: {} > u64::MAX", buffer_stride))
547 })?;
548
549 match buffer_stride.checked_mul(num_buffers.into()).map(isize::try_from) {
553 None | Some(Err(std::num::TryFromIntError { .. })) => {
554 return Err(Error::Config(format!(
555 "too much memory required for the buffers: {} * {} > isize::MAX",
556 buffer_stride, num_buffers
557 )));
558 }
559 Some(Ok(_total)) => (),
560 };
561
562 let buffer_stride = NonZeroU64::new(buffer_stride)
563 .ok_or_else(|| Error::Config("buffer_stride is zero".to_owned()))?;
564
565 let min_tx_data = match usize::try_from(*min_tx_buffer_length)
566 .map(|min_tx| (min_tx <= buffer_length).then_some(min_tx))
567 {
568 Ok(Some(min_tx_buffer_length)) => min_tx_buffer_length,
569 Ok(None) | Err(std::num::TryFromIntError { .. }) => {
571 return Err(Error::Config(format!(
572 "buffer_length smaller than minimum TX requirement: {} < {}",
573 buffer_length, *min_tx_buffer_length
574 )));
575 }
576 };
577
578 let mut options = netdev::SessionFlags::empty();
579 options.set(netdev::SessionFlags::PRIMARY, primary);
580 options.set(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES, watch_rx_leases);
581
582 Ok(Config {
583 buffer_stride,
584 num_rx_buffers,
585 num_tx_buffers,
586 options,
587 buffer_layout: BufferLayout {
588 length: buffer_length,
589 min_tx_head: *min_tx_buffer_head,
590 min_tx_tail: *min_tx_buffer_tail,
591 min_tx_data,
592 },
593 })
594 }
595}
596
597#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
599pub struct Port {
600 pub(crate) base: u8,
601 pub(crate) salt: u8,
602}
603
604impl TryFrom<netdev::PortId> for Port {
605 type Error = Error;
606 fn try_from(netdev::PortId { base, salt }: netdev::PortId) -> Result<Self> {
607 if base <= netdev::MAX_PORTS {
608 Ok(Self { base, salt })
609 } else {
610 Err(Error::InvalidPortId(base))
611 }
612 }
613}
614
615impl From<Port> for netdev::PortId {
616 fn from(Port { base, salt }: Port) -> Self {
617 Self { base, salt }
618 }
619}
620
621struct Pending<K: AllocKind> {
623 inner: Mutex<(Vec<DescId<K>>, Option<Waker>)>,
624}
625
626impl<K: AllocKind> Pending<K> {
627 fn new(descs: Vec<DescId<K>>) -> Self {
628 Self { inner: Mutex::new((descs, None)) }
629 }
630
631 fn extend(&self, descs: impl IntoIterator<Item = DescId<K>>) {
633 let mut guard = self.inner.lock();
634 let (storage, waker) = &mut *guard;
635 storage.extend(descs);
636 if let Some(waker) = waker.take() {
637 waker.wake();
638 }
639 }
640
641 fn poll_submit(
647 &self,
648 fifo: &fasync::Fifo<DescId<K>>,
649 cx: &mut Context<'_>,
650 ) -> Poll<Result<usize>> {
651 let mut guard = self.inner.lock();
652 let (storage, waker) = &mut *guard;
653 if storage.is_empty() {
654 *waker = Some(cx.waker().clone());
655 return Poll::Pending;
656 }
657
658 let submitted = ready!(fifo.try_write(cx, &storage[..]))
663 .map_err(|status| Error::Fifo("write", K::REFL.as_str(), status))?;
664 let _drained = storage.drain(0..submitted);
665 Poll::Ready(Ok(submitted))
666 }
667}
668
669struct ReadyBuffer<T> {
676 data: Vec<MaybeUninit<T>>,
682 available: Range<usize>,
683}
684
685impl<T> Drop for ReadyBuffer<T> {
686 fn drop(&mut self) {
687 let Self { data, available } = self;
688 for initialized in &mut data[available.clone()] {
689 unsafe { initialized.assume_init_drop() }
692 }
693 *available = 0..0;
694 }
695}
696
697impl<T> ReadyBuffer<T> {
698 fn new(capacity: usize) -> Self {
699 let data = std::iter::from_fn(|| Some(MaybeUninit::uninit())).take(capacity).collect();
700 Self { data, available: 0..0 }
701 }
702
703 fn poll_with_fifo(
704 &mut self,
705 cx: &mut Context<'_>,
706 fifo: &fuchsia_async::Fifo<T>,
707 ) -> Poll<std::result::Result<T, zx::Status>>
708 where
709 T: fasync::FifoEntry,
710 {
711 let Self { data, available: Range { start, end } } = self;
712
713 loop {
714 if *start != *end {
716 let desc = std::mem::replace(&mut data[*start], MaybeUninit::uninit());
717 *start += 1;
718 let desc = unsafe { desc.assume_init() };
721 return Poll::Ready(Ok(desc));
722 }
723 let count = ready!(fifo.try_read(cx, &mut data[..]))?;
725 *start = 0;
726 *end = count;
727 }
728 }
729}
730
731struct TxIdleListeners {
732 event: event_listener::Event,
733 tx_in_flight: AtomicUsize,
734}
735
736impl TxIdleListeners {
737 fn new() -> Self {
738 Self { event: event_listener::Event::new(), tx_in_flight: AtomicUsize::new(0) }
739 }
740
741 fn tx_complete(&self) {
745 let Self { event, tx_in_flight } = self;
746 let old_value = tx_in_flight.fetch_sub(1, atomic::Ordering::SeqCst);
747 debug_assert_ne!(old_value, 0);
748 if old_value == 1 {
749 let _notified: usize = event.notify(usize::MAX);
750 }
751 }
752
753 fn tx_started(&self) {
755 let Self { event: _, tx_in_flight } = self;
756 let _: usize = tx_in_flight.fetch_add(1, atomic::Ordering::SeqCst);
757 }
758
759 async fn wait(&self) {
760 let Self { event, tx_in_flight } = self;
761 loop {
770 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
771 return;
772 }
773
774 event_listener::listener!(event => listener);
775
776 if tx_in_flight.load(atomic::Ordering::SeqCst) == 0 {
777 return;
778 }
779
780 listener.await;
781 }
782 }
783}
784
785#[derive(Debug)]
791pub struct RxLease {
792 handle: netdev::DelegatedRxLeaseHandle,
793}
794
795impl Drop for RxLease {
796 fn drop(&mut self) {
797 let Self { handle } = self;
798 match handle {
801 netdev::DelegatedRxLeaseHandle::Channel(_channel) => {
802 }
804 netdev::DelegatedRxLeaseHandle::Eventpair(_eventpair) => {
805 }
807 netdev::DelegatedRxLeaseHandle::__SourceBreaking { .. } => {}
808 }
809 }
810}
811
812impl RxLease {
813 pub fn inner(&self) -> &netdev::DelegatedRxLeaseHandle {
815 &self.handle
816 }
817}
818
819#[cfg(test)]
820mod tests {
821 use std::num::NonZeroU32;
822 use std::ops::Deref;
823 use std::sync::Arc;
824 use std::task::Poll;
825
826 use assert_matches::assert_matches;
827 use fuchsia_async::Fifo;
828 use test_case::test_case;
829 use zerocopy::{FromBytes, Immutable, IntoBytes};
830 use zx::{AsHandleRef as _, HandleBased as _};
831
832 use crate::session::DerivableConfig;
833
834 use super::buffer::{
835 AllocKind, DescId, NETWORK_DEVICE_DESCRIPTOR_LENGTH, NETWORK_DEVICE_DESCRIPTOR_VERSION,
836 };
837 use super::{
838 BufferLayout, Config, DeviceBaseInfo, DeviceInfo, Error, Inner, Mutex, Pending, Pool,
839 ReadyBuffer, Task, TxIdleListeners,
840 };
841
842 const DEFAULT_DEVICE_BASE_INFO: DeviceBaseInfo = DeviceBaseInfo {
843 rx_depth: 1,
844 tx_depth: 1,
845 buffer_alignment: 1,
846 max_buffer_length: None,
847 min_rx_buffer_length: 0,
848 min_tx_buffer_head: 0,
849 min_tx_buffer_length: 0,
850 min_tx_buffer_tail: 0,
851 max_buffer_parts: fidl_fuchsia_hardware_network::MAX_DESCRIPTOR_CHAIN,
852 rx_accel: Vec::new(),
853 tx_accel: Vec::new(),
854 };
855
856 const DEFAULT_DEVICE_INFO: DeviceInfo = DeviceInfo {
857 min_descriptor_length: 0,
858 descriptor_version: 1,
859 base_info: DEFAULT_DEVICE_BASE_INFO,
860 };
861
862 const DEFAULT_BUFFER_LENGTH: usize = 2048;
863
864 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
865 min_descriptor_length: u8::MAX,
866 ..DEFAULT_DEVICE_INFO
867 }, format!("descriptor length too small: {} < {}", NETWORK_DEVICE_DESCRIPTOR_LENGTH, u8::MAX))]
868 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
869 descriptor_version: 42,
870 ..DEFAULT_DEVICE_INFO
871 }, format!("descriptor version mismatch: {} != {}", NETWORK_DEVICE_DESCRIPTOR_VERSION, 42))]
872 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
873 base_info: DeviceBaseInfo {
874 tx_depth: 0,
875 ..DEFAULT_DEVICE_BASE_INFO
876 },
877 ..DEFAULT_DEVICE_INFO
878 }, "no TX buffers")]
879 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
880 base_info: DeviceBaseInfo {
881 rx_depth: 0,
882 ..DEFAULT_DEVICE_BASE_INFO
883 },
884 ..DEFAULT_DEVICE_INFO
885 }, "no RX buffers")]
886 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
887 base_info: DeviceBaseInfo {
888 tx_depth: u16::MAX,
889 rx_depth: u16::MAX,
890 ..DEFAULT_DEVICE_BASE_INFO
891 },
892 ..DEFAULT_DEVICE_INFO
893 }, format!("too many buffers requested: {} + {} > u16::MAX", u16::MAX, u16::MAX))]
894 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
895 base_info: DeviceBaseInfo {
896 min_tx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
897 ..DEFAULT_DEVICE_BASE_INFO
898 },
899 ..DEFAULT_DEVICE_INFO
900 }, format!(
901 "buffer_length smaller than minimum TX requirement: {} < {}",
902 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
903 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
904 base_info: DeviceBaseInfo {
905 min_tx_buffer_head: DEFAULT_BUFFER_LENGTH as u16 + 1,
906 ..DEFAULT_DEVICE_BASE_INFO
907 },
908 ..DEFAULT_DEVICE_INFO
909 }, format!(
910 "buffer length {} does not meet minimum tx buffer head/tail requirement {}/0",
911 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
912 #[test_case(DEFAULT_BUFFER_LENGTH, DeviceInfo {
913 base_info: DeviceBaseInfo {
914 min_tx_buffer_tail: DEFAULT_BUFFER_LENGTH as u16 + 1,
915 ..DEFAULT_DEVICE_BASE_INFO
916 },
917 ..DEFAULT_DEVICE_INFO
918 }, format!(
919 "buffer length {} does not meet minimum tx buffer head/tail requirement 0/{}",
920 DEFAULT_BUFFER_LENGTH, DEFAULT_BUFFER_LENGTH + 1))]
921 #[test_case(0, DEFAULT_DEVICE_INFO, "buffer_stride is zero")]
922 #[test_case(usize::MAX, DEFAULT_DEVICE_INFO,
923 format!(
924 "too much memory required for the buffers: {} * {} > isize::MAX",
925 usize::MAX, 2))]
926 #[test_case(usize::MAX, DeviceInfo {
927 base_info: DeviceBaseInfo {
928 buffer_alignment: 2,
929 ..DEFAULT_DEVICE_BASE_INFO
930 },
931 ..DEFAULT_DEVICE_INFO
932 }, format!(
933 "not possible to align {} to {} under usize::MAX",
934 usize::MAX, 2))]
935 fn configs_from_device_info_err(
936 buffer_length: usize,
937 info: DeviceInfo,
938 expected: impl Deref<Target = str>,
939 ) {
940 let config = DerivableConfig { default_buffer_length: buffer_length, ..Default::default() };
941 assert_matches!(
942 info.make_config(config),
943 Err(Error::Config(got)) if got.as_str() == expected.deref()
944 );
945 }
946
947 #[test_case(DeviceInfo {
948 base_info: DeviceBaseInfo {
949 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 + 1,
950 ..DEFAULT_DEVICE_BASE_INFO
951 },
952 ..DEFAULT_DEVICE_INFO
953 }, DEFAULT_BUFFER_LENGTH + 1; "default below min")]
954 #[test_case(DeviceInfo {
955 base_info: DeviceBaseInfo {
956 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 - 1),
957 ..DEFAULT_DEVICE_BASE_INFO
958 },
959 ..DEFAULT_DEVICE_INFO
960 }, DEFAULT_BUFFER_LENGTH - 1; "default above max")]
961 #[test_case(DeviceInfo {
962 base_info: DeviceBaseInfo {
963 min_rx_buffer_length: DEFAULT_BUFFER_LENGTH as u32 - 1,
964 max_buffer_length: NonZeroU32::new(DEFAULT_BUFFER_LENGTH as u32 + 1),
965 ..DEFAULT_DEVICE_BASE_INFO
966 },
967 ..DEFAULT_DEVICE_INFO
968 }, DEFAULT_BUFFER_LENGTH; "default in bounds")]
969 fn configs_from_device_buffer_length(info: DeviceInfo, expected_length: usize) {
970 let config = info
971 .make_config(DerivableConfig {
972 default_buffer_length: DEFAULT_BUFFER_LENGTH,
973 ..Default::default()
974 })
975 .expect("is valid");
976 let Config {
977 buffer_layout: BufferLayout { length, min_tx_data: _, min_tx_head: _, min_tx_tail: _ },
978 buffer_stride: _,
979 num_rx_buffers: _,
980 num_tx_buffers: _,
981 options: _,
982 } = config;
983 assert_eq!(length, expected_length);
984 }
985
986 fn make_fifos<K: AllocKind>() -> (Fifo<DescId<K>>, zx::Fifo<DescId<K>>) {
987 let (handle, other_end) = zx::Fifo::create(1).unwrap();
988 (Fifo::from_fifo(handle), other_end)
989 }
990
991 fn remove_rights<T: FromBytes + IntoBytes + Immutable>(
992 fifo: Fifo<T>,
993 rights_to_remove: zx::Rights,
994 ) -> Fifo<T> {
995 let fifo = zx::Fifo::from(fifo);
996 let rights = fifo.as_handle_ref().basic_info().expect("can retrieve info").rights;
997
998 let fifo = fifo.replace_handle(rights ^ rights_to_remove).expect("can replace");
999 Fifo::from_fifo(fifo)
1000 }
1001
1002 enum TxOrRx {
1003 Tx,
1004 Rx,
1005 }
1006 #[test_case(TxOrRx::Tx, zx::Rights::READ; "tx read")]
1007 #[test_case(TxOrRx::Tx, zx::Rights::WRITE; "tx write")]
1008 #[test_case(TxOrRx::Rx, zx::Rights::WRITE; "rx read")]
1009 #[fuchsia_async::run_singlethreaded(test)]
1010 async fn task_as_future_poll_error(which_fifo: TxOrRx, right_to_remove: zx::Rights) {
1011 let (pool, _descriptors, _data) = Pool::new(
1019 DEFAULT_DEVICE_INFO
1020 .make_config(DerivableConfig {
1021 default_buffer_length: DEFAULT_BUFFER_LENGTH,
1022 ..Default::default()
1023 })
1024 .expect("is valid"),
1025 )
1026 .expect("is valid");
1027 let (session_proxy, _session_server) =
1028 fidl::endpoints::create_proxy::<fidl_fuchsia_hardware_network::SessionMarker>();
1029
1030 let (rx, _rx_sender) = make_fifos();
1031 let (tx, _tx_receiver) = make_fifos();
1032
1033 let (tx, rx) = match which_fifo {
1035 TxOrRx::Tx => (remove_rights(tx, right_to_remove), rx),
1036 TxOrRx::Rx => (tx, remove_rights(rx, right_to_remove)),
1037 };
1038
1039 let buf = pool.alloc_tx_buffer(1).await.expect("can allocate");
1040 let inner = Arc::new(Inner {
1041 pool,
1042 proxy: session_proxy,
1043 name: "fake_task".to_string(),
1044 rx,
1045 tx,
1046 tx_pending: Pending::new(vec![]),
1047 rx_ready: Mutex::new(ReadyBuffer::new(10)),
1048 tx_ready: Mutex::new(ReadyBuffer::new(10)),
1049 tx_idle_listeners: TxIdleListeners::new(),
1050 });
1051
1052 inner.send(buf).expect("can send");
1053
1054 let mut task = Task { inner };
1055
1056 assert_matches!(futures::poll!(&mut task), Poll::Ready(Err(Error::Fifo(_, _, _))));
1059 }
1060}