use fuchsia_sync::Mutex;
use futures::task::AtomicWaker;
use std::borrow::Borrow;
use std::collections::VecDeque;
use std::convert::TryInto as _;
use std::fmt::Debug;
use std::io::{Read, Seek, SeekFrom, Write};
use std::mem::{ManuallyDrop, MaybeUninit};
use std::num::TryFromIntError;
use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use std::sync::atomic::{self, AtomicBool, AtomicU64};
use std::sync::Arc;
use std::task::Poll;
use explicit::ResultExt as _;
use fidl_fuchsia_hardware_network as netdev;
use fuchsia_runtime::vmar_root_self;
use futures::channel::oneshot::{channel, Receiver, Sender};
use super::{ChainLength, DescId, DescRef, DescRefMut, Descriptors};
use crate::error::{Error, Result};
use crate::session::{BufferLayout, Config, Pending, Port};
pub(in crate::session) struct Pool {
base: NonNull<u8>,
bytes: usize,
descriptors: Descriptors,
tx_alloc_state: Mutex<TxAllocState>,
pub(in crate::session) rx_pending: Pending<Rx>,
buffer_layout: BufferLayout,
rx_leases: RxLeaseHandlingState,
}
unsafe impl Send for Pool {}
unsafe impl Sync for Pool {}
struct TxAllocState {
requests: VecDeque<TxAllocReq>,
free_list: TxFreeList,
}
struct TxFreeList {
head: Option<DescId<Tx>>,
len: u16,
}
impl Pool {
pub(in crate::session) fn new(config: Config) -> Result<(Arc<Self>, zx::Vmo, zx::Vmo)> {
let Config { buffer_stride, num_rx_buffers, num_tx_buffers, options, buffer_layout } =
config;
let num_buffers = num_rx_buffers.get() + num_tx_buffers.get();
let (descriptors, descriptors_vmo, tx_free, mut rx_free) =
Descriptors::new(num_tx_buffers, num_rx_buffers, buffer_stride)?;
let free_head = tx_free.into_iter().rev().fold(None, |head, mut curr| {
descriptors.borrow_mut(&mut curr).set_nxt(head);
Some(curr)
});
for rx_desc in rx_free.iter_mut() {
descriptors.borrow_mut(rx_desc).initialize(
ChainLength::ZERO,
0,
buffer_layout.length.try_into().unwrap(),
0,
);
}
let tx_alloc_state = TxAllocState {
free_list: TxFreeList { head: free_head, len: num_tx_buffers.get() },
requests: VecDeque::new(),
};
let size = buffer_stride.get() * u64::from(num_buffers);
let data_vmo = zx::Vmo::create(size).map_err(|status| Error::Vmo("data", status))?;
let len = isize::try_from(size).expect("VMO size larger than isize::MAX") as usize;
let base = NonNull::new(
vmar_root_self()
.map(0, &data_vmo, 0, len, zx::VmarFlags::PERM_READ | zx::VmarFlags::PERM_WRITE)
.map_err(|status| Error::Map("data", status))? as *mut u8,
)
.unwrap();
Ok((
Arc::new(Pool {
base,
bytes: len,
descriptors,
tx_alloc_state: Mutex::new(tx_alloc_state),
rx_pending: Pending::new(rx_free),
buffer_layout,
rx_leases: RxLeaseHandlingState::new_with_flags(options),
}),
descriptors_vmo,
data_vmo,
))
}
pub(in crate::session) async fn alloc_tx(
self: &Arc<Self>,
num_parts: ChainLength,
) -> AllocGuard<Tx> {
let receiver = {
let mut state = self.tx_alloc_state.lock();
match state.free_list.try_alloc(num_parts, &self.descriptors) {
Some(allocated) => {
return AllocGuard::new(allocated, self.clone());
}
None => {
let (request, receiver) = TxAllocReq::new(num_parts);
state.requests.push_back(request);
receiver
}
}
};
receiver.await.unwrap()
}
pub(in crate::session) async fn alloc_tx_buffer(
self: &Arc<Self>,
num_bytes: usize,
) -> Result<Buffer<Tx>> {
self.alloc_tx_buffers(num_bytes).await?.next().unwrap()
}
pub(in crate::session) async fn alloc_tx_buffers<'a>(
self: &'a Arc<Self>,
num_bytes: usize,
) -> Result<impl Iterator<Item = Result<Buffer<Tx>>> + 'a> {
let BufferLayout { min_tx_data, min_tx_head, min_tx_tail, length: buffer_length } =
self.buffer_layout;
let tx_head = usize::from(min_tx_head);
let tx_tail = usize::from(min_tx_tail);
let total_bytes = num_bytes.max(min_tx_data) + tx_head + tx_tail;
let num_parts = (total_bytes + buffer_length - 1) / buffer_length;
let chain_length = ChainLength::try_from(num_parts)?;
let first = self.alloc_tx(chain_length).await;
let iter = std::iter::once(first)
.chain(std::iter::from_fn(move || {
let mut state = self.tx_alloc_state.lock();
state
.free_list
.try_alloc(chain_length, &self.descriptors)
.map(|allocated| AllocGuard::new(allocated, self.clone()))
}))
.fuse()
.map(move |mut alloc| {
alloc.init(num_bytes)?;
Ok(alloc.into())
});
Ok(iter)
}
pub(in crate::session) fn free_rx(&self, descs: impl IntoIterator<Item = DescId<Rx>>) {
self.rx_pending.extend(descs.into_iter().map(|mut desc| {
self.descriptors.borrow_mut(&mut desc).initialize(
ChainLength::ZERO,
0,
self.buffer_layout.length.try_into().unwrap(),
0,
);
desc
}));
}
fn free_tx(self: &Arc<Self>, chain: Chained<DescId<Tx>>) {
let free_impl = |free_list: &mut TxFreeList, chain: Chained<DescId<Tx>>| {
let mut descs = chain.into_iter();
free_list.len += u16::try_from(descs.len()).unwrap();
let head = descs.next();
let old_head = std::mem::replace(&mut free_list.head, head);
let mut tail = descs.last();
let mut tail_ref = self
.descriptors
.borrow_mut(tail.as_mut().unwrap_or_else(|| free_list.head.as_mut().unwrap()));
tail_ref.set_nxt(old_head);
};
let mut state = self.tx_alloc_state.lock();
let TxAllocState { requests, free_list } = &mut *state;
let () = free_impl(free_list, chain);
while let Some(req) = requests.front() {
match free_list.try_alloc(req.size, &self.descriptors) {
Some(descs) => {
match requests
.pop_front()
.unwrap()
.sender
.send(AllocGuard::new(descs, self.clone()))
.map_err(ManuallyDrop::new)
{
Ok(()) => {}
Err(mut alloc) => {
let AllocGuard { descs, pool } = alloc.deref_mut();
let () =
free_impl(free_list, std::mem::replace(descs, Chained::empty()));
let () = unsafe {
std::ptr::drop_in_place(pool);
};
}
}
}
None => {
if req.sender.is_canceled() {
let _cancelled: Option<TxAllocReq> = requests.pop_front();
continue;
} else {
break;
}
}
}
}
}
pub(in crate::session) fn tx_completed(self: &Arc<Self>, head: DescId<Tx>) -> Result<()> {
let chain = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
Ok(self.free_tx(chain))
}
pub(in crate::session) fn rx_completed(
self: &Arc<Self>,
head: DescId<Rx>,
) -> Result<Buffer<Rx>> {
let descs = self.descriptors.chain(head).collect::<Result<Chained<_>>>()?;
let alloc = AllocGuard::new(descs, self.clone());
Ok(alloc.into())
}
}
impl Drop for Pool {
fn drop(&mut self) {
unsafe {
vmar_root_self()
.unmap(self.base.as_ptr() as usize, self.bytes)
.expect("failed to unmap VMO for Pool")
}
}
}
impl TxFreeList {
fn try_alloc(
&mut self,
num_parts: ChainLength,
descriptors: &Descriptors,
) -> Option<Chained<DescId<Tx>>> {
if u16::from(num_parts.get()) > self.len {
return None;
}
let free_list = std::iter::from_fn(|| -> Option<DescId<Tx>> {
let new_head = self.head.as_ref().and_then(|head| {
let nxt = descriptors.borrow(head).nxt();
nxt.map(|id| unsafe {
DescId::from_raw(id)
})
});
std::mem::replace(&mut self.head, new_head)
});
let allocated = free_list.take(num_parts.get().into()).collect::<Chained<_>>();
assert_eq!(allocated.len(), num_parts.into());
self.len -= u16::from(num_parts.get());
Some(allocated)
}
}
pub struct Buffer<K: AllocKind> {
alloc: AllocGuard<K>,
parts: Chained<BufferPart>,
pos: usize,
}
impl<K: AllocKind> Debug for Buffer<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { alloc, parts, pos } = self;
f.debug_struct("Buffer")
.field("cap", &self.cap())
.field("alloc", alloc)
.field("parts", parts)
.field("pos", pos)
.finish()
}
}
impl<K: AllocKind> Buffer<K> {
pub fn cap(&self) -> usize {
self.parts.iter().fold(0, |acc, part| acc + part.cap)
}
pub fn len(&self) -> usize {
self.parts.iter().fold(0, |acc, part| acc + part.len)
}
pub fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<()> {
if self.cap() < offset + src.len() {
return Err(Error::TooSmall { size: self.cap(), offset, length: src.len() });
}
let mut part_start = 0;
let mut total = 0;
for part in self.parts.iter_mut() {
if offset + total < part_start + part.cap {
let written = part.write_at(offset + total - part_start, &src[total..])?;
total += written;
if total == src.len() {
break;
}
} else {
part.len = part.cap;
}
part_start += part.cap;
}
assert_eq!(total, src.len());
Ok(())
}
pub fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<()> {
if self.len() < offset + dst.len() {
return Err(Error::TooSmall { size: self.len(), offset, length: dst.len() });
}
let mut part_start = 0;
let mut total = 0;
for part in self.parts.iter() {
if offset + total < part_start + part.cap {
let read = part.read_at(offset + total - part_start, &mut dst[total..])?;
total += read;
if total == dst.len() {
break;
}
}
part_start += part.cap;
}
assert_eq!(total, dst.len());
Ok(())
}
pub(in crate::session) fn pad(&mut self) -> Result<()> {
let num_parts = self.parts.len();
let BufferLayout { min_tx_tail, min_tx_data, min_tx_head: _, length: _ } =
self.alloc.pool.buffer_layout;
let mut target = min_tx_data;
for (i, part) in self.parts.iter_mut().enumerate() {
let grow_cap = if i == num_parts - 1 {
let descriptor =
self.alloc.descriptors().last().expect("descriptor must not be empty");
let data_length = descriptor.data_length();
let tail_length = descriptor.tail_length();
let rest = usize::try_from(data_length).unwrap() + usize::from(tail_length);
match rest.checked_sub(usize::from(min_tx_tail)) {
Some(grow_cap) => Some(grow_cap),
None => break,
}
} else {
None
};
target -= part.pad(target, grow_cap)?;
}
if target != 0 {
return Err(Error::Pad(min_tx_data, self.cap()));
}
Ok(())
}
pub(in crate::session) fn leak(mut self) -> DescId<K> {
let descs = std::mem::replace(&mut self.alloc.descs, Chained::empty());
descs.into_iter().next().unwrap()
}
pub fn frame_type(&self) -> Result<netdev::FrameType> {
self.alloc.descriptor().frame_type()
}
pub fn port(&self) -> Port {
self.alloc.descriptor().port()
}
}
impl Buffer<Tx> {
pub(in crate::session) fn commit(&mut self) {
for (part, mut descriptor) in self.parts.iter_mut().zip(self.alloc.descriptors_mut()) {
descriptor.commit(u32::try_from(part.len).unwrap())
}
}
pub fn set_port(&mut self, port: Port) {
self.alloc.descriptor_mut().set_port(port)
}
pub fn set_frame_type(&mut self, frame_type: netdev::FrameType) {
self.alloc.descriptor_mut().set_frame_type(frame_type)
}
pub fn set_tx_flags(&mut self, flags: netdev::TxFlags) {
self.alloc.descriptor_mut().set_tx_flags(flags)
}
}
impl Buffer<Rx> {
pub async fn into_tx(self) -> Buffer<Tx> {
let Buffer { alloc, parts, pos } = self;
Buffer { alloc: alloc.into_tx().await, parts, pos }
}
pub fn rx_flags(&self) -> Result<netdev::RxFlags> {
self.alloc.descriptor().rx_flags()
}
}
impl AllocGuard<Rx> {
async fn into_tx(mut self) -> AllocGuard<Tx> {
let mut tx = self.pool.alloc_tx(self.descs.len).await;
std::mem::swap(&mut self.descs.storage, unsafe {
std::mem::transmute(&mut tx.descs.storage)
});
tx
}
}
struct Chained<T> {
storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
len: ChainLength,
}
impl<T> Deref for Chained<T> {
type Target = [T];
fn deref(&self) -> &Self::Target {
unsafe { std::mem::transmute(&self.storage[..self.len.into()]) }
}
}
impl<T> DerefMut for Chained<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::mem::transmute(&mut self.storage[..self.len.into()]) }
}
}
impl<T> Drop for Chained<T> {
fn drop(&mut self) {
unsafe {
std::ptr::drop_in_place(self.deref_mut());
}
}
}
impl<T: Debug> Debug for Chained<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
impl<T> Chained<T> {
#[allow(clippy::uninit_assumed_init)]
fn empty() -> Self {
Self { storage: unsafe { MaybeUninit::uninit().assume_init() }, len: ChainLength::ZERO }
}
}
impl<T> FromIterator<T> for Chained<T> {
fn from_iter<I: IntoIterator<Item = T>>(elements: I) -> Self {
let mut result = Self::empty();
let mut len = 0u8;
for (idx, e) in elements.into_iter().enumerate() {
result.storage[idx] = MaybeUninit::new(e);
len += 1;
}
assert!(len > 0);
result.len = ChainLength::try_from(len).unwrap();
result
}
}
impl<T> IntoIterator for Chained<T> {
type Item = T;
type IntoIter = ChainedIter<T>;
fn into_iter(mut self) -> Self::IntoIter {
let len = self.len;
self.len = ChainLength::ZERO;
#[allow(clippy::uninit_assumed_init)]
let storage =
std::mem::replace(&mut self.storage, unsafe { MaybeUninit::uninit().assume_init() });
ChainedIter { storage, len, consumed: 0 }
}
}
struct ChainedIter<T> {
storage: [MaybeUninit<T>; netdev::MAX_DESCRIPTOR_CHAIN as usize],
len: ChainLength,
consumed: u8,
}
impl<T> Iterator for ChainedIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
if self.consumed < self.len.get() {
let value = unsafe {
std::mem::replace(
&mut self.storage[usize::from(self.consumed)],
MaybeUninit::uninit(),
)
.assume_init()
};
self.consumed += 1;
Some(value)
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = usize::from(self.len.get() - self.consumed);
(len, Some(len))
}
}
impl<T> ExactSizeIterator for ChainedIter<T> {}
impl<T> Drop for ChainedIter<T> {
fn drop(&mut self) {
unsafe {
std::ptr::drop_in_place(std::mem::transmute::<_, &mut [T]>(
&mut self.storage[self.consumed.into()..self.len.into()],
));
}
}
}
pub(in crate::session) struct AllocGuard<K: AllocKind> {
descs: Chained<DescId<K>>,
pool: Arc<Pool>,
}
impl<K: AllocKind> Debug for AllocGuard<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self { descs, pool: _ } = self;
f.debug_struct("AllocGuard").field("descs", descs).finish()
}
}
impl<K: AllocKind> AllocGuard<K> {
fn new(descs: Chained<DescId<K>>, pool: Arc<Pool>) -> Self {
Self { descs, pool }
}
fn descriptors(&self) -> impl Iterator<Item = DescRef<'_, K>> + '_ {
self.descs.iter().map(move |desc| self.pool.descriptors.borrow(desc))
}
fn descriptors_mut(&mut self) -> impl Iterator<Item = DescRefMut<'_, K>> + '_ {
let descriptors = &self.pool.descriptors;
self.descs.iter_mut().map(move |desc| descriptors.borrow_mut(desc))
}
fn descriptor(&self) -> DescRef<'_, K> {
self.descriptors().next().expect("descriptors must not be empty")
}
fn descriptor_mut(&mut self) -> DescRefMut<'_, K> {
self.descriptors_mut().next().expect("descriptors must not be empty")
}
}
impl AllocGuard<Tx> {
fn init(&mut self, mut requested_bytes: usize) -> Result<()> {
let len = self.len();
let BufferLayout { min_tx_head, min_tx_tail, length: buffer_length, min_tx_data: _ } =
self.pool.buffer_layout;
for (mut descriptor, clen) in self.descriptors_mut().zip((0..len).rev()) {
let chain_length = ChainLength::try_from(clen).unwrap();
let head_length = if clen + 1 == len { min_tx_head } else { 0 };
let mut tail_length = if clen == 0 { min_tx_tail } else { 0 };
let available_bytes =
u32::try_from(buffer_length - usize::from(head_length) - usize::from(tail_length))
.unwrap();
let data_length = match u32::try_from(requested_bytes) {
Ok(requested) => {
if requested < available_bytes {
tail_length = u16::try_from(available_bytes - requested)
.ok_checked::<TryFromIntError>()
.and_then(|tail_adjustment| tail_length.checked_add(tail_adjustment))
.ok_or(Error::TxLength)?;
}
requested.min(available_bytes)
}
Err(TryFromIntError { .. }) => available_bytes,
};
requested_bytes -=
usize::try_from(data_length).unwrap_or_else(|TryFromIntError { .. }| {
panic!(
"data_length: {} must be smaller than requested_bytes: {}, which is a usize",
data_length, requested_bytes
)
});
descriptor.initialize(chain_length, head_length, data_length, tail_length);
}
assert_eq!(requested_bytes, 0);
Ok(())
}
}
impl<K: AllocKind> Drop for AllocGuard<K> {
fn drop(&mut self) {
if self.is_empty() {
return;
}
K::free(private::Allocation(self));
}
}
impl<K: AllocKind> Deref for AllocGuard<K> {
type Target = [DescId<K>];
fn deref(&self) -> &Self::Target {
self.descs.deref()
}
}
struct BufferPart {
ptr: *mut u8,
cap: usize,
len: usize,
}
impl BufferPart {
unsafe fn new(ptr: *mut u8, cap: usize, len: usize) -> Self {
Self { ptr, cap, len }
}
fn read_at(&self, offset: usize, dst: &mut [u8]) -> Result<usize> {
let available = self.len.checked_sub(offset).ok_or(Error::Index(offset, self.len))?;
let to_copy = std::cmp::min(available, dst.len());
unsafe { std::ptr::copy_nonoverlapping(self.ptr.add(offset), dst.as_mut_ptr(), to_copy) }
Ok(to_copy)
}
fn write_at(&mut self, offset: usize, src: &[u8]) -> Result<usize> {
let available = self.cap.checked_sub(offset).ok_or(Error::Index(offset, self.cap))?;
let to_copy = std::cmp::min(src.len(), available);
unsafe { std::ptr::copy_nonoverlapping(src.as_ptr(), self.ptr.add(offset), to_copy) }
self.len = std::cmp::max(self.len, offset + to_copy);
Ok(to_copy)
}
fn pad(&mut self, target: usize, limit: Option<usize>) -> Result<usize> {
if target <= self.len {
return Ok(target);
}
if let Some(limit) = limit {
if target > limit {
return Err(Error::Pad(target, self.cap));
}
if self.cap < target {
self.cap = target
}
}
let new_len = std::cmp::min(target, self.cap);
unsafe {
std::ptr::write_bytes(self.ptr.add(self.len), 0, new_len - self.len);
}
self.len = new_len;
Ok(new_len)
}
}
unsafe impl Send for BufferPart {}
impl Debug for BufferPart {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let BufferPart { len, cap, ptr } = &self;
f.debug_struct("BufferPart").field("ptr", ptr).field("len", len).field("cap", cap).finish()
}
}
impl<K: AllocKind> From<AllocGuard<K>> for Buffer<K> {
fn from(alloc: AllocGuard<K>) -> Self {
let AllocGuard { pool, descs: _ } = &alloc;
let parts: Chained<BufferPart> = alloc
.descriptors()
.map(|descriptor| {
let offset = usize::try_from(descriptor.offset()).unwrap();
let head_length = usize::from(descriptor.head_length());
let data_length = usize::try_from(descriptor.data_length()).unwrap();
let len = match K::REFL {
AllocKindRefl::Tx => 0,
AllocKindRefl::Rx => data_length,
};
assert!(
offset + head_length <= pool.bytes,
"buffer part starts beyond the end of pool"
);
assert!(
offset + head_length + data_length <= pool.bytes,
"buffer part ends beyond the end of pool"
);
unsafe {
BufferPart::new(pool.base.as_ptr().add(offset + head_length), data_length, len)
}
})
.collect();
Self { alloc, parts, pos: 0 }
}
}
impl<K: AllocKind> Read for Buffer<K> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.read_at(self.pos, buf)
.map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
self.pos += buf.len();
Ok(buf.len())
}
}
impl Write for Buffer<Tx> {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.write_at(self.pos, buf)
.map_err(|error| std::io::Error::new(std::io::ErrorKind::InvalidInput, error))?;
self.pos += buf.len();
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
Ok(())
}
}
impl<K: AllocKind> Seek for Buffer<K> {
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
let pos = match pos {
SeekFrom::Start(pos) => pos,
SeekFrom::End(offset) => {
let end = i64::try_from(self.cap())
.map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
u64::try_from(end.wrapping_add(offset)).unwrap()
}
SeekFrom::Current(offset) => {
let current = i64::try_from(self.pos)
.map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
u64::try_from(current.wrapping_add(offset)).unwrap()
}
};
self.pos =
usize::try_from(pos).map_err(|TryFromIntError { .. }| zx::Status::OUT_OF_RANGE)?;
Ok(pos)
}
}
struct TxAllocReq {
sender: Sender<AllocGuard<Tx>>,
size: ChainLength,
}
impl TxAllocReq {
fn new(size: ChainLength) -> (Self, Receiver<AllocGuard<Tx>>) {
let (sender, receiver) = channel();
(TxAllocReq { sender, size }, receiver)
}
}
mod private {
use super::{AllocKind, Rx, Tx};
pub trait Sealed: 'static + Sized {}
impl Sealed for Rx {}
impl Sealed for Tx {}
pub struct Allocation<'a, K: AllocKind>(pub(super) &'a mut super::AllocGuard<K>);
}
pub trait AllocKind: private::Sealed {
const REFL: AllocKindRefl;
fn free(alloc: private::Allocation<'_, Self>);
}
pub enum Tx {}
pub enum Rx {}
pub enum AllocKindRefl {
Tx,
Rx,
}
impl AllocKindRefl {
pub(in crate::session) fn as_str(&self) -> &'static str {
match self {
AllocKindRefl::Tx => "Tx",
AllocKindRefl::Rx => "Rx",
}
}
}
impl AllocKind for Tx {
const REFL: AllocKindRefl = AllocKindRefl::Tx;
fn free(alloc: private::Allocation<'_, Self>) {
let private::Allocation(AllocGuard { pool, descs }) = alloc;
pool.free_tx(std::mem::replace(descs, Chained::empty()));
}
}
impl AllocKind for Rx {
const REFL: AllocKindRefl = AllocKindRefl::Rx;
fn free(alloc: private::Allocation<'_, Self>) {
let private::Allocation(AllocGuard { pool, descs }) = alloc;
pool.free_rx(std::mem::replace(descs, Chained::empty()));
pool.rx_leases.rx_complete();
}
}
pub(in crate::session) struct RxLeaseHandlingState {
can_watch_rx_leases: AtomicBool,
rx_frame_counter: AtomicU64,
rx_lease_waker: AtomicWaker,
}
impl RxLeaseHandlingState {
fn new_with_flags(flags: netdev::SessionFlags) -> Self {
Self::new_with_enabled(flags.contains(netdev::SessionFlags::RECEIVE_RX_POWER_LEASES))
}
fn new_with_enabled(enabled: bool) -> Self {
Self {
can_watch_rx_leases: AtomicBool::new(enabled),
rx_frame_counter: AtomicU64::new(0),
rx_lease_waker: AtomicWaker::new(),
}
}
fn rx_complete(&self) {
let Self { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } = self;
let prev = rx_frame_counter.fetch_add(1, atomic::Ordering::SeqCst);
if prev == u64::MAX {
rx_lease_waker.wake();
}
}
}
pub(in crate::session) trait RxLeaseHandlingStateContainer {
fn lease_handling_state(&self) -> &RxLeaseHandlingState;
}
impl<T: Borrow<RxLeaseHandlingState>> RxLeaseHandlingStateContainer for T {
fn lease_handling_state(&self) -> &RxLeaseHandlingState {
self.borrow()
}
}
impl RxLeaseHandlingStateContainer for Arc<Pool> {
fn lease_handling_state(&self) -> &RxLeaseHandlingState {
&self.rx_leases
}
}
pub(in crate::session) struct RxLeaseWatcher<T> {
state: T,
}
impl<T: RxLeaseHandlingStateContainer> RxLeaseWatcher<T> {
pub(in crate::session) fn new(state: T) -> Self {
assert!(
state.lease_handling_state().can_watch_rx_leases.swap(false, atomic::Ordering::SeqCst),
"can't watch rx leases"
);
Self { state }
}
pub(in crate::session) async fn wait_until(&mut self, hold_until_frame: u64) {
let RxLeaseHandlingState { can_watch_rx_leases: _, rx_frame_counter, rx_lease_waker } =
self.state.lease_handling_state();
let prev = rx_frame_counter.fetch_sub(hold_until_frame, atomic::Ordering::SeqCst);
let _guard = scopeguard::guard((), |()| {
let _: u64 = rx_frame_counter.fetch_add(hold_until_frame, atomic::Ordering::SeqCst);
});
if prev >= hold_until_frame {
return;
}
let threshold = prev.wrapping_sub(hold_until_frame);
futures::future::poll_fn(|cx| {
let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
if v < threshold {
return Poll::Ready(());
}
rx_lease_waker.register(cx.waker());
let v = rx_frame_counter.load(atomic::Ordering::SeqCst);
if v < threshold {
return Poll::Ready(());
}
Poll::Pending
})
.await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use const_unwrap::const_unwrap_option;
use fuchsia_async as fasync;
use futures::future::FutureExt;
use test_case::test_case;
use std::collections::HashSet;
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
use std::pin::pin;
use std::task::{Poll, Waker};
const DEFAULT_MIN_TX_BUFFER_HEAD: u16 = 4;
const DEFAULT_MIN_TX_BUFFER_TAIL: u16 = 8;
const DEFAULT_BUFFER_LENGTH: NonZeroUsize = const_unwrap_option(NonZeroUsize::new(64));
const DEFAULT_TX_BUFFERS: NonZeroU16 = const_unwrap_option(NonZeroU16::new(8));
const DEFAULT_RX_BUFFERS: NonZeroU16 = const_unwrap_option(NonZeroU16::new(8));
const MAX_BUFFER_BYTES: usize = DEFAULT_BUFFER_LENGTH.get()
* netdev::MAX_DESCRIPTOR_CHAIN as usize
- DEFAULT_MIN_TX_BUFFER_HEAD as usize
- DEFAULT_MIN_TX_BUFFER_TAIL as usize;
const SENTINEL_BYTE: u8 = 0xab;
const WRITE_BYTE: u8 = 1;
const PAD_BYTE: u8 = 0;
const DEFAULT_CONFIG: Config = Config {
buffer_stride: const_unwrap::const_unwrap_option(NonZeroU64::new(
DEFAULT_BUFFER_LENGTH.get() as u64,
)),
num_rx_buffers: DEFAULT_RX_BUFFERS,
num_tx_buffers: DEFAULT_TX_BUFFERS,
options: netdev::SessionFlags::empty(),
buffer_layout: BufferLayout {
length: DEFAULT_BUFFER_LENGTH.get(),
min_tx_head: DEFAULT_MIN_TX_BUFFER_HEAD,
min_tx_tail: DEFAULT_MIN_TX_BUFFER_TAIL,
min_tx_data: 0,
},
};
impl Pool {
fn new_test_default() -> Arc<Self> {
let (pool, _descriptors, _data) =
Pool::new(DEFAULT_CONFIG).expect("failed to create default pool");
pool
}
async fn alloc_tx_checked(self: &Arc<Self>, n: u8) -> AllocGuard<Tx> {
self.alloc_tx(ChainLength::try_from(n).expect("failed to convert to chain length"))
.await
}
fn alloc_tx_now_or_never(self: &Arc<Self>, n: u8) -> Option<AllocGuard<Tx>> {
self.alloc_tx_checked(n).now_or_never()
}
fn alloc_tx_all(self: &Arc<Self>, n: u8) -> Vec<AllocGuard<Tx>> {
std::iter::from_fn(|| self.alloc_tx_now_or_never(n)).collect()
}
fn alloc_tx_buffer_now_or_never(self: &Arc<Self>, num_bytes: usize) -> Option<Buffer<Tx>> {
self.alloc_tx_buffer(num_bytes)
.now_or_never()
.transpose()
.expect("invalid arguments for alloc_tx_buffer")
}
fn set_min_tx_buffer_length(self: &mut Arc<Self>, length: usize) {
Arc::get_mut(self).unwrap().buffer_layout.min_tx_data = length;
}
fn fill_sentinel_bytes(&mut self) {
unsafe { std::ptr::write_bytes(self.base.as_ptr(), SENTINEL_BYTE, self.bytes) };
}
}
impl Buffer<Tx> {
fn check_write_and_pad(&mut self, offset: usize, pad_size: usize) {
self.write_at(offset, &[WRITE_BYTE][..]).expect("failed to write to self");
self.pad().expect("failed to pad");
assert_eq!(self.len(), pad_size);
const INIT_BYTE: u8 = 42;
let mut read_buf = vec![INIT_BYTE; pad_size];
self.read_at(0, &mut read_buf[..]).expect("failed to read from self");
for (idx, byte) in read_buf.iter().enumerate() {
if idx < offset {
assert_eq!(*byte, SENTINEL_BYTE);
} else if idx == offset {
assert_eq!(*byte, WRITE_BYTE);
} else {
assert_eq!(*byte, PAD_BYTE);
}
}
}
}
impl<K, I, T> PartialEq<T> for Chained<DescId<K>>
where
K: AllocKind,
I: ExactSizeIterator<Item = u16>,
T: Copy + IntoIterator<IntoIter = I>,
{
fn eq(&self, other: &T) -> bool {
let iter = other.into_iter();
if usize::from(self.len) != iter.len() {
return false;
}
self.iter().zip(iter).all(|(l, r)| l.get() == r)
}
}
impl Debug for TxAllocReq {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let TxAllocReq { sender: _, size } = self;
f.debug_struct("TxAllocReq").field("size", &size).finish_non_exhaustive()
}
}
#[test]
fn alloc_tx_distinct() {
let pool = Pool::new_test_default();
let allocated = pool.alloc_tx_all(1);
assert_eq!(allocated.len(), DEFAULT_TX_BUFFERS.get().into());
let distinct = allocated
.iter()
.map(|alloc| {
assert_eq!(alloc.descs.len(), 1);
alloc.descs[0].get()
})
.collect::<HashSet<u16>>();
assert_eq!(allocated.len(), distinct.len());
}
#[test]
fn alloc_tx_free_len() {
let pool = Pool::new_test_default();
{
let allocated = pool.alloc_tx_all(2);
assert_eq!(
allocated.iter().fold(0, |acc, a| { acc + a.descs.len() }),
DEFAULT_TX_BUFFERS.get().into()
);
assert_eq!(pool.tx_alloc_state.lock().free_list.len, 0);
}
assert_eq!(pool.tx_alloc_state.lock().free_list.len, DEFAULT_TX_BUFFERS.get());
}
#[test]
fn alloc_tx_chain() {
let pool = Pool::new_test_default();
let allocated = pool.alloc_tx_all(3);
assert_eq!(allocated.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 3);
assert_matches!(pool.alloc_tx_now_or_never(3), None);
assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
}
#[test]
fn alloc_tx_many() {
let pool = Pool::new_test_default();
let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
- u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
- u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
let data_len = usize::try_from(data_len).unwrap();
let mut buffers = pool
.alloc_tx_buffers(data_len)
.now_or_never()
.expect("failed to alloc")
.unwrap()
.collect::<Result<Vec<_>>>()
.expect("buffer error");
assert_eq!(buffers.len(), DEFAULT_TX_BUFFERS.get().into());
assert!(pool.alloc_tx_buffers(data_len).now_or_never().is_none());
assert_matches!(buffers.pop(), Some(_));
let mut more_buffers =
pool.alloc_tx_buffers(data_len).now_or_never().expect("failed to alloc").unwrap();
let buffer = assert_matches!(more_buffers.next(), Some(Ok(b)) => b);
assert_matches!(more_buffers.next(), None);
drop(buffer);
assert_matches!(more_buffers.next(), None);
}
#[test]
fn alloc_tx_after_free() {
let pool = Pool::new_test_default();
let mut allocated = pool.alloc_tx_all(1);
assert_matches!(pool.alloc_tx_now_or_never(2), None);
{
let _drained = allocated.drain(..2);
}
assert_matches!(pool.alloc_tx_now_or_never(2), Some(a) if a.descs.len() == 2);
}
#[test]
fn blocking_alloc_tx() {
let mut executor = fasync::TestExecutor::new();
let pool = Pool::new_test_default();
let mut allocated = pool.alloc_tx_all(1);
let alloc_fut = pool.alloc_tx_checked(1);
let mut alloc_fut = pin!(alloc_fut);
assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
assert!(!pool.tx_alloc_state.lock().requests.is_empty());
let freed = allocated
.pop()
.expect("no fulfulled allocations")
.iter()
.map(|x| x.get())
.collect::<Chained<_>>();
let same_as_freed =
|descs: &Chained<DescId<Tx>>| descs.iter().map(|x| x.get()).eq(freed.iter().copied());
assert_matches!(
&executor.run_until_stalled(&mut alloc_fut),
Poll::Ready(AllocGuard{ descs, pool: _ }) if same_as_freed(descs)
);
assert!(pool.tx_alloc_state.lock().requests.is_empty());
}
#[test]
fn blocking_alloc_tx_cancel_before_free() {
let mut executor = fasync::TestExecutor::new();
let pool = Pool::new_test_default();
let mut allocated = pool.alloc_tx_all(1);
{
let alloc_fut = pool.alloc_tx_checked(1);
let mut alloc_fut = pin!(alloc_fut);
assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
assert_matches!(
pool.tx_alloc_state.lock().requests.as_slices(),
(&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
);
}
assert_matches!(
allocated.pop(),
Some(AllocGuard { ref descs, pool: ref p })
if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
);
let state = pool.tx_alloc_state.lock();
assert_eq!(state.free_list.len, 1);
assert!(state.requests.is_empty());
}
#[test]
fn blocking_alloc_tx_cancel_after_free() {
let mut executor = fasync::TestExecutor::new();
let pool = Pool::new_test_default();
let mut allocated = pool.alloc_tx_all(1);
{
let alloc_fut = pool.alloc_tx_checked(1);
let mut alloc_fut = pin!(alloc_fut);
assert_matches!(executor.run_until_stalled(&mut alloc_fut), Poll::Pending);
assert_matches!(
pool.tx_alloc_state.lock().requests.as_slices(),
(&[ref req1, ref req2], &[]) if req1.size.get() == 1 && req2.size.get() == 1
);
assert_matches!(
allocated.pop(),
Some(AllocGuard { ref descs, pool: ref p })
if descs == &[DEFAULT_TX_BUFFERS.get() - 1] && Arc::ptr_eq(p, &pool)
);
}
let state = pool.tx_alloc_state.lock();
assert_eq!(state.free_list.len, 1);
assert!(state.requests.is_empty());
}
#[test]
fn multiple_blocking_alloc_tx_fulfill_order() {
const TASKS_TOTAL: usize = 3;
let mut executor = fasync::TestExecutor::new();
let pool = Pool::new_test_default();
let mut allocated = pool.alloc_tx_all(1);
let mut alloc_futs = (1..=TASKS_TOTAL)
.rev()
.map(|x| {
let pool = pool.clone();
(x, Box::pin(async move { pool.alloc_tx_checked(x.try_into().unwrap()).await }))
})
.collect::<Vec<_>>();
for (idx, (req_size, task)) in alloc_futs.iter_mut().enumerate() {
assert_matches!(executor.run_until_stalled(task), Poll::Pending);
assert_eq!(idx + *req_size, TASKS_TOTAL);
}
{
let state = pool.tx_alloc_state.lock();
assert_eq!(state.requests.len(), TASKS_TOTAL + 1);
let mut requests = state.requests.iter();
assert!(requests.next().unwrap().sender.is_canceled());
assert!(requests.all(|req| !req.sender.is_canceled()))
}
let mut to_free = Vec::new();
let mut freed = 0;
for free_size in (1..=TASKS_TOTAL).rev() {
let (_req_size, mut task) = alloc_futs.remove(0);
for _ in 1..free_size {
freed += 1;
assert_matches!(
allocated.pop(),
Some(AllocGuard { ref descs, pool: ref p })
if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
);
assert_matches!(executor.run_until_stalled(&mut task), Poll::Pending);
}
freed += 1;
assert_matches!(
allocated.pop(),
Some(AllocGuard { ref descs, pool: ref p })
if descs == &[DEFAULT_TX_BUFFERS.get() - freed] && Arc::ptr_eq(p, &pool)
);
match executor.run_until_stalled(&mut task) {
Poll::Ready(alloc) => {
assert_eq!(alloc.len(), free_size);
to_free.push(alloc);
}
Poll::Pending => panic!("The request should be fulfilled"),
}
for (_req_size, task) in alloc_futs.iter_mut() {
assert_matches!(executor.run_until_stalled(task), Poll::Pending);
}
}
assert!(pool.tx_alloc_state.lock().requests.is_empty());
}
#[test]
fn singleton_tx_layout() {
let pool = Pool::new_test_default();
let buffers = std::iter::from_fn(|| {
let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
- u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
- u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
pool.alloc_tx_buffer_now_or_never(usize::try_from(data_len).unwrap()).map(|buffer| {
assert_eq!(buffer.alloc.descriptors().count(), 1);
let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
* u64::from(buffer.alloc[0].get());
{
let descriptor = buffer.alloc.descriptor();
assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
assert_eq!(descriptor.head_length(), DEFAULT_MIN_TX_BUFFER_HEAD);
assert_eq!(descriptor.tail_length(), DEFAULT_MIN_TX_BUFFER_TAIL);
assert_eq!(descriptor.data_length(), data_len);
assert_eq!(descriptor.offset(), offset);
}
assert_eq!(buffer.parts.len(), 1);
let BufferPart { ptr, len, cap } = buffer.parts[0];
assert_eq!(len, 0);
assert_eq!(
pool.base.as_ptr().wrapping_add(
usize::try_from(offset).unwrap() + usize::from(DEFAULT_MIN_TX_BUFFER_HEAD),
),
ptr
);
assert_eq!(data_len, u32::try_from(cap).unwrap());
buffer
})
})
.collect::<Vec<_>>();
assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()));
}
#[test]
fn chained_tx_layout() {
let pool = Pool::new_test_default();
let alloc_len = 4 * DEFAULT_BUFFER_LENGTH.get()
- usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
- usize::from(DEFAULT_MIN_TX_BUFFER_TAIL);
let buffers = std::iter::from_fn(|| {
pool.alloc_tx_buffer_now_or_never(alloc_len).map(|buffer| {
assert_eq!(buffer.parts.len(), 4);
for (idx, descriptor) in buffer.alloc.descriptors().enumerate() {
let chain_length = ChainLength::try_from(buffer.alloc.len() - idx - 1).unwrap();
let head_length = if idx == 0 { DEFAULT_MIN_TX_BUFFER_HEAD } else { 0 };
let tail_length = if chain_length == ChainLength::ZERO {
DEFAULT_MIN_TX_BUFFER_TAIL
} else {
0
};
let data_len = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
- u32::from(head_length)
- u32::from(tail_length);
let offset = u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
* u64::from(buffer.alloc[idx].get());
assert_eq!(descriptor.chain_length().unwrap(), chain_length);
assert_eq!(descriptor.head_length(), head_length);
assert_eq!(descriptor.tail_length(), tail_length);
assert_eq!(descriptor.offset(), offset);
assert_eq!(descriptor.data_length(), data_len);
if chain_length != ChainLength::ZERO {
assert_eq!(descriptor.nxt(), Some(buffer.alloc[idx + 1].get()));
}
let BufferPart { ptr, cap, len } = buffer.parts[idx];
assert_eq!(len, 0);
assert_eq!(
pool.base.as_ptr().wrapping_add(
usize::try_from(offset).unwrap() + usize::from(head_length),
),
ptr
);
assert_eq!(data_len, u32::try_from(cap).unwrap());
}
buffer
})
})
.collect::<Vec<_>>();
assert_eq!(buffers.len(), usize::from(DEFAULT_TX_BUFFERS.get()) / 4);
}
#[test]
fn rx_distinct() {
let pool = Pool::new_test_default();
let mut guard = pool.rx_pending.inner.lock();
let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
let distinct = descs.iter().map(|desc| desc.get()).collect::<HashSet<u16>>();
assert_eq!(descs.len(), distinct.len());
}
#[test]
fn alloc_rx_layout() {
let pool = Pool::new_test_default();
let mut guard = pool.rx_pending.inner.lock();
let (descs, _): &mut (Vec<_>, Option<Waker>) = &mut *guard;
assert_eq!(descs.len(), usize::from(DEFAULT_RX_BUFFERS.get()));
for desc in descs.iter() {
let descriptor = pool.descriptors.borrow(desc);
let offset =
u64::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() * u64::from(desc.get());
assert_matches!(descriptor.chain_length(), Ok(ChainLength::ZERO));
assert_eq!(descriptor.head_length(), 0);
assert_eq!(descriptor.tail_length(), 0);
assert_eq!(descriptor.offset(), offset);
assert_eq!(
descriptor.data_length(),
u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
);
}
}
#[test]
fn buffer_read_at_write_at() {
let pool = Pool::new_test_default();
let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
let mut buffer =
pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
assert_eq!(buffer.parts.len(), 2);
assert_eq!(buffer.cap(), alloc_bytes);
let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
buffer.write_at(0, &write_buf[..]).expect("failed to write into buffer");
let mut read_buf = [0xff; DEFAULT_BUFFER_LENGTH.get()];
buffer.read_at(0, &mut read_buf[..]).expect("failed to read from buffer");
for (idx, byte) in read_buf.iter().enumerate() {
assert_eq!(*byte, write_buf[idx]);
}
}
#[test]
fn buffer_read_write_seek() {
let pool = Pool::new_test_default();
let alloc_bytes = DEFAULT_BUFFER_LENGTH.get();
let mut buffer =
pool.alloc_tx_buffer_now_or_never(alloc_bytes).expect("failed to allocate");
assert_eq!(buffer.parts.len(), 2);
assert_eq!(buffer.cap(), alloc_bytes);
let write_buf = (0..u8::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()).collect::<Vec<_>>();
assert_eq!(
buffer.write(&write_buf[..]).expect("failed to write into buffer"),
write_buf.len()
);
const SEEK_FROM_END: usize = 64;
const READ_LEN: usize = 12;
assert_eq!(
buffer.seek(SeekFrom::End(-i64::try_from(SEEK_FROM_END).unwrap())).unwrap(),
u64::try_from(buffer.cap() - SEEK_FROM_END).unwrap()
);
let mut read_buf = [0xff; READ_LEN];
assert_eq!(
buffer.read(&mut read_buf[..]).expect("failed to read from buffer"),
read_buf.len()
);
assert_eq!(&write_buf[..READ_LEN], &read_buf[..]);
}
#[test_case(32; "single buffer part")]
#[test_case(MAX_BUFFER_BYTES; "multiple buffer parts")]
fn buffer_pad(pad_size: usize) {
let mut pool = Pool::new_test_default();
pool.set_min_tx_buffer_length(pad_size);
for offset in 0..pad_size {
Arc::get_mut(&mut pool)
.expect("there are multiple owners of the underlying VMO")
.fill_sentinel_bytes();
let mut buffer =
pool.alloc_tx_buffer_now_or_never(pad_size).expect("failed to allocate buffer");
buffer.check_write_and_pad(offset, pad_size);
}
}
#[test]
fn buffer_pad_grow() {
const BUFFER_PARTS: u8 = 3;
let mut pool = Pool::new_test_default();
let pad_size = u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap()
* u32::from(BUFFER_PARTS)
- u32::from(DEFAULT_MIN_TX_BUFFER_HEAD)
- u32::from(DEFAULT_MIN_TX_BUFFER_TAIL);
pool.set_min_tx_buffer_length(pad_size.try_into().unwrap());
for offset in 0..pad_size - u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap() {
Arc::get_mut(&mut pool)
.expect("there are multiple owners of the underlying VMO")
.fill_sentinel_bytes();
let mut alloc =
pool.alloc_tx_now_or_never(BUFFER_PARTS).expect("failed to alloc descriptors");
alloc
.init(
DEFAULT_BUFFER_LENGTH.get() * usize::from(BUFFER_PARTS)
- usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
- usize::from(DEFAULT_MIN_TX_BUFFER_TAIL),
)
.expect("head/body/tail sizes are representable with u16/u32/u16");
let mut buffer = Buffer::try_from(alloc).unwrap();
buffer.check_write_and_pad(offset.try_into().unwrap(), pad_size.try_into().unwrap());
}
}
#[test_case( 0; "writes at the beginning")]
#[test_case( 15; "writes in the first part")]
#[test_case( 75; "writes in the second part")]
#[test_case(135; "writes in the third part")]
#[test_case(195; "writes in the last part")]
fn buffer_used(write_offset: usize) {
let pool = Pool::new_test_default();
let mut buffer =
pool.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES).expect("failed to allocate buffer");
let expected_caps = (0..netdev::MAX_DESCRIPTOR_CHAIN).map(|i| {
if i == 0 {
DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_HEAD)
} else if i < netdev::MAX_DESCRIPTOR_CHAIN - 1 {
DEFAULT_BUFFER_LENGTH.get()
} else {
DEFAULT_BUFFER_LENGTH.get() - usize::from(DEFAULT_MIN_TX_BUFFER_TAIL)
}
});
assert_eq!(buffer.alloc.len(), netdev::MAX_DESCRIPTOR_CHAIN.into());
buffer.write_at(write_offset, &[WRITE_BYTE][..]).expect("failed to write to buffer");
assert_eq!(
buffer.parts.iter().zip(expected_caps).fold(
Some(write_offset),
|offset, (part, expected_cap)| {
assert_eq!(part.cap, expected_cap);
match offset {
Some(offset) => {
if offset >= expected_cap {
assert_eq!(part.len, part.cap);
Some(offset - part.len)
} else {
assert_eq!(part.len, offset + 1);
let mut buf = [0];
assert_matches!(part.read_at(offset, &mut buf), Ok(1));
assert_eq!(buf[0], WRITE_BYTE);
None
}
}
None => {
assert_eq!(part.len, 0);
None
}
}
}
),
None
)
}
#[test]
fn buffer_commit() {
let pool = Pool::new_test_default();
for offset in 0..MAX_BUFFER_BYTES {
let mut buffer = pool
.alloc_tx_buffer_now_or_never(MAX_BUFFER_BYTES)
.expect("failed to allocate buffer");
buffer.write_at(offset, &[1][..]).expect("failed to write to buffer");
buffer.commit();
for (part, descriptor) in buffer.parts.iter().zip(buffer.alloc.descriptors()) {
let head_length = descriptor.head_length();
let tail_length = descriptor.tail_length();
let data_length = descriptor.data_length();
assert_eq!(u32::try_from(part.len).unwrap(), data_length);
assert_eq!(
u32::from(head_length + tail_length) + data_length,
u32::try_from(DEFAULT_BUFFER_LENGTH.get()).unwrap(),
);
}
}
}
#[test]
fn allocate_under_device_minimum() {
const MIN_TX_DATA: usize = 32;
const ALLOC_SIZE: usize = 16;
const WRITE_BYTE: u8 = 0xff;
const WRITE_SENTINAL_BYTE: u8 = 0xee;
const READ_SENTINAL_BYTE: u8 = 0xdd;
let mut config = DEFAULT_CONFIG;
config.buffer_layout.min_tx_data = MIN_TX_DATA;
let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create a new pool");
for mut buffer in Vec::from_iter(std::iter::from_fn({
let pool = pool.clone();
move || pool.alloc_tx_buffer_now_or_never(MIN_TX_DATA)
})) {
buffer.write_at(0, &[WRITE_SENTINAL_BYTE; MIN_TX_DATA]).expect("failed to write");
}
let mut allocated =
pool.alloc_tx_buffer_now_or_never(16).expect("failed to allocate buffer");
assert_eq!(allocated.cap(), ALLOC_SIZE);
const WRITE_BUF_SIZE: usize = ALLOC_SIZE + 1;
assert_matches!(
allocated.write_at(0, &[WRITE_BYTE; WRITE_BUF_SIZE]),
Err(Error::TooSmall { size: ALLOC_SIZE, offset: 0, length: WRITE_BUF_SIZE })
);
allocated.write_at(0, &[WRITE_BYTE; ALLOC_SIZE]).expect("failed to write to buffer");
assert_matches!(allocated.pad(), Ok(()));
assert_eq!(allocated.cap(), MIN_TX_DATA);
assert_eq!(allocated.len(), MIN_TX_DATA);
const READ_BUF_SIZE: usize = MIN_TX_DATA + 1;
let mut read_buf = [READ_SENTINAL_BYTE; READ_BUF_SIZE];
assert_matches!(
allocated.read_at(0, &mut read_buf[..]),
Err(Error::TooSmall { size: MIN_TX_DATA, offset: 0, length: READ_BUF_SIZE })
);
allocated.read_at(0, &mut read_buf[..MIN_TX_DATA]).expect("failed to read from buffer");
assert_eq!(&read_buf[..ALLOC_SIZE], &[WRITE_BYTE; ALLOC_SIZE][..]);
assert_eq!(&read_buf[ALLOC_SIZE..MIN_TX_DATA], &[0x0; ALLOC_SIZE][..]);
assert_eq!(&read_buf[MIN_TX_DATA..], &[READ_SENTINAL_BYTE; 1][..]);
}
#[test]
fn invalid_tx_length() {
let mut config = DEFAULT_CONFIG;
config.buffer_layout.length = usize::from(u16::MAX) + 2;
config.buffer_layout.min_tx_head = 0;
let (pool, _descriptors, _vmo) = Pool::new(config).expect("failed to create pool");
assert_matches!(pool.alloc_tx_buffer(1).now_or_never(), Some(Err(Error::TxLength)));
}
#[test]
fn rx_leases() {
let mut executor = fuchsia_async::TestExecutor::new();
let state = RxLeaseHandlingState::new_with_enabled(true);
let mut watcher = RxLeaseWatcher { state: &state };
{
let mut fut = pin!(watcher.wait_until(0));
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
}
{
state.rx_complete();
let mut fut = pin!(watcher.wait_until(1));
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
}
{
let mut fut = pin!(watcher.wait_until(0));
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
}
{
let mut fut = pin!(watcher.wait_until(3));
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
state.rx_complete();
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
state.rx_complete();
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Ready(()));
}
let counter_before = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
{
let mut fut = pin!(watcher.wait_until(10000));
assert_eq!(executor.run_until_stalled(&mut fut), Poll::Pending);
}
let counter_after = state.rx_frame_counter.load(atomic::Ordering::SeqCst);
assert_eq!(counter_before, counter_after);
}
}