use {
anyhow::{ensure, Error},
async_trait::async_trait,
fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _},
fuchsia_zircon::sys::zx_handle_t,
fuchsia_zircon::{self as zx, HandleBased as _},
futures::{channel::oneshot, executor::block_on},
lazy_static::lazy_static,
std::{
collections::HashMap,
future::Future,
hash::{Hash, Hasher},
ops::{DerefMut, Range},
pin::Pin,
sync::{
atomic::{AtomicU16, Ordering},
Arc, Mutex,
},
task::{Context, Poll, Waker},
},
storage_trace::{self as trace, TraceFutureExt},
};
pub use cache::Cache;
pub use block::Flag as BlockFlags;
pub mod fifo;
pub use fifo::*;
pub mod cache;
pub mod testing;
const TEMP_VMO_SIZE: usize = 65536;
pub use block_driver::{BlockIoFlag, BlockOpcode};
fn opcode_str(opcode: u8) -> &'static str {
match BlockOpcode::from_primitive(opcode) {
Some(BlockOpcode::Read) => "read",
Some(BlockOpcode::Write) => "write",
Some(BlockOpcode::Flush) => "flush",
Some(BlockOpcode::Trim) => "trim",
Some(BlockOpcode::CloseVmo) => "close_vmo",
None => "unknown",
}
}
fn generate_trace_flow_id(request_id: u32) -> u64 {
lazy_static! {
static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
};
*SELF_HANDLE as u64 + (request_id as u64) << 32
}
pub enum BufferSlice<'a> {
VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
Memory(&'a [u8]),
}
impl<'a> BufferSlice<'a> {
pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
BufferSlice::VmoId { vmo_id, offset, length }
}
}
impl<'a> From<&'a [u8]> for BufferSlice<'a> {
fn from(buf: &'a [u8]) -> Self {
BufferSlice::Memory(buf)
}
}
pub enum MutableBufferSlice<'a> {
VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
Memory(&'a mut [u8]),
}
impl<'a> MutableBufferSlice<'a> {
pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
MutableBufferSlice::VmoId { vmo_id, offset, length }
}
}
impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
fn from(buf: &'a mut [u8]) -> Self {
MutableBufferSlice::Memory(buf)
}
}
#[derive(Default)]
struct RequestState {
result: Option<zx::Status>,
waker: Option<Waker>,
}
#[derive(Default)]
struct FifoState {
fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
next_request_id: u32,
queue: std::collections::VecDeque<BlockFifoRequest>,
map: HashMap<u32, RequestState>,
poller_waker: Option<Waker>,
}
impl FifoState {
fn terminate(&mut self) {
self.fifo.take();
for (_, request_state) in self.map.iter_mut() {
request_state.result.get_or_insert(zx::Status::CANCELED);
if let Some(waker) = request_state.waker.take() {
waker.wake();
}
}
if let Some(waker) = self.poller_waker.take() {
waker.wake();
}
}
fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
let fifo = if let Some(fifo) = self.fifo.as_ref() {
fifo
} else {
return true;
};
loop {
let slice = self.queue.as_slices().0;
if slice.is_empty() {
return false;
}
match fifo.write(context, slice) {
Poll::Ready(Ok(sent)) => {
self.queue.drain(0..sent);
}
Poll::Ready(Err(_)) => {
self.terminate();
return true;
}
Poll::Pending => {
return false;
}
}
}
}
}
type FifoStateRef = Arc<Mutex<FifoState>>;
struct ResponseFuture {
request_id: u32,
fifo_state: FifoStateRef,
}
impl ResponseFuture {
fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
ResponseFuture { request_id, fifo_state }
}
}
impl Future for ResponseFuture {
type Output = Result<(), zx::Status>;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let mut state = self.fifo_state.lock().unwrap();
let request_state = state.map.get_mut(&self.request_id).unwrap();
if let Some(result) = request_state.result {
Poll::Ready(result.into())
} else {
request_state.waker.replace(context.waker().clone());
Poll::Pending
}
}
}
impl Drop for ResponseFuture {
fn drop(&mut self) {
self.fifo_state.lock().unwrap().map.remove(&self.request_id).unwrap();
}
}
#[derive(Debug)]
pub struct VmoId(AtomicU16);
impl VmoId {
pub fn new(id: u16) -> Self {
Self(AtomicU16::new(id))
}
pub fn take(&self) -> Self {
Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
}
pub fn is_valid(&self) -> bool {
self.id() != block_driver::BLOCK_VMOID_INVALID
}
pub fn into_id(self) -> u16 {
self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
}
pub fn id(&self) -> u16 {
self.0.load(Ordering::Relaxed)
}
}
impl PartialEq for VmoId {
fn eq(&self, other: &Self) -> bool {
self.id() == other.id()
}
}
impl Eq for VmoId {}
impl Drop for VmoId {
fn drop(&mut self) {
assert_eq!(
self.0.load(Ordering::Relaxed),
block_driver::BLOCK_VMOID_INVALID,
"Did you forget to detach?"
);
}
}
impl Hash for VmoId {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
#[async_trait]
pub trait BlockClient: Send + Sync {
async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, Error>;
async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), Error>;
async fn read_at(
&self,
buffer_slice: MutableBufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error>;
async fn write_at(
&self,
buffer_slice: BufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error>;
async fn trim(&self, _device_range: Range<u64>) -> Result<(), Error> {
Ok(())
}
async fn flush(&self) -> Result<(), Error>;
async fn close(&self) -> Result<(), Error>;
fn block_size(&self) -> u32;
fn block_count(&self) -> u64;
fn block_flags(&self) -> BlockFlags {
BlockFlags::default()
}
fn is_connected(&self) -> bool;
}
struct Common {
block_size: u32,
block_count: u64,
block_flags: BlockFlags,
fifo_state: FifoStateRef,
temp_vmo: futures::lock::Mutex<zx::Vmo>,
temp_vmo_id: VmoId,
}
impl Common {
fn to_blocks(&self, bytes: u64) -> Result<u64, Error> {
ensure!(bytes % self.block_size as u64 == 0, "bad alignment");
Ok(bytes / self.block_size as u64)
}
async fn send(&self, mut request: BlockFifoRequest) -> Result<(), Error> {
let trace_args = storage_trace::trace_future_args!(
c"storage",
c"BlockOp",
"op" => opcode_str(request.command.opcode)
);
async move {
let (request_id, trace_flow_id) = {
let mut state = self.fifo_state.lock().unwrap();
if state.fifo.is_none() {
return Err(zx::Status::CANCELED.into());
}
trace::duration!(c"storage", c"BlockOp::start");
let request_id = state.next_request_id;
let trace_flow_id = generate_trace_flow_id(request_id);
state.next_request_id = state.next_request_id.overflowing_add(1).0;
assert!(
state.map.insert(request_id, RequestState::default()).is_none(),
"request id in use!"
);
request.reqid = request_id;
request.trace_flow_id = generate_trace_flow_id(request_id);
trace::flow_begin!(c"storage", c"BlockOp", trace_flow_id);
state.queue.push_back(request);
if let Some(waker) = state.poller_waker.clone() {
state.poll_send_requests(&mut Context::from_waker(&waker));
}
(request_id, trace_flow_id)
};
ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
trace::duration!(c"storage", c"BlockOp::end");
trace::flow_end!(c"storage", c"BlockOp", trace_flow_id);
Ok(())
}
.trace(trace_args)
.await
}
}
impl Common {
fn new(
fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
info: &block::BlockInfo,
temp_vmo: zx::Vmo,
temp_vmo_id: VmoId,
) -> Self {
let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
Self {
block_size: info.block_size,
block_count: info.block_count,
block_flags: info.flags,
fifo_state,
temp_vmo: futures::lock::Mutex::new(temp_vmo),
temp_vmo_id,
}
}
async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), Error> {
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::CloseVmo.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: vmo_id.into_id(),
..Default::default()
})
.await
}
async fn read_at(
&self,
buffer_slice: MutableBufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error> {
match buffer_slice {
MutableBufferSlice::VmoId { vmo_id, offset, length } => {
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::Read.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: vmo_id.id(),
length: self.to_blocks(length)?.try_into()?,
vmo_offset: self.to_blocks(offset)?,
dev_offset: self.to_blocks(device_offset)?,
..Default::default()
})
.await?
}
MutableBufferSlice::Memory(mut slice) => {
let temp_vmo = self.temp_vmo.lock().await;
let mut device_block = self.to_blocks(device_offset)?;
loop {
let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
let block_count = self.to_blocks(to_do as u64)? as u32;
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::Read.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: self.temp_vmo_id.id(),
length: block_count,
vmo_offset: 0,
dev_offset: device_block,
..Default::default()
})
.await?;
temp_vmo.read(&mut slice[..to_do], 0)?;
if to_do == slice.len() {
break;
}
device_block += block_count as u64;
slice = &mut slice[to_do..];
}
}
}
Ok(())
}
async fn write_at(
&self,
buffer_slice: BufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error> {
match buffer_slice {
BufferSlice::VmoId { vmo_id, offset, length } => {
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::Write.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: vmo_id.id(),
length: self.to_blocks(length)?.try_into()?,
vmo_offset: self.to_blocks(offset)?,
dev_offset: self.to_blocks(device_offset)?,
..Default::default()
})
.await?;
}
BufferSlice::Memory(mut slice) => {
let temp_vmo = self.temp_vmo.lock().await;
let mut device_block = self.to_blocks(device_offset)?;
loop {
let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
let block_count = self.to_blocks(to_do as u64)? as u32;
temp_vmo.write(&slice[..to_do], 0)?;
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::Write.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: self.temp_vmo_id.id(),
length: block_count,
vmo_offset: 0,
dev_offset: device_block,
..Default::default()
})
.await?;
if to_do == slice.len() {
break;
}
device_block += block_count as u64;
slice = &slice[to_do..];
}
}
}
Ok(())
}
async fn trim(&self, device_range: Range<u64>) -> Result<(), Error> {
let length = self.to_blocks(device_range.end - device_range.start)? as u32;
let dev_offset = self.to_blocks(device_range.start)?;
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::Trim.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: block_driver::BLOCK_VMOID_INVALID,
length,
dev_offset,
..Default::default()
})
.await
}
async fn flush(&self) -> Result<(), Error> {
self.send(BlockFifoRequest {
command: BlockFifoCommand {
opcode: BlockOpcode::Flush.into_primitive(),
flags: 0,
..Default::default()
},
vmoid: block_driver::BLOCK_VMOID_INVALID,
..Default::default()
})
.await
}
fn block_size(&self) -> u32 {
self.block_size
}
fn block_count(&self) -> u64 {
self.block_count
}
fn block_flags(&self) -> BlockFlags {
self.block_flags
}
fn is_connected(&self) -> bool {
self.fifo_state.lock().unwrap().fifo.is_some()
}
}
impl Drop for Common {
fn drop(&mut self) {
self.temp_vmo_id.take().into_id();
self.fifo_state.lock().unwrap().terminate();
}
}
pub struct RemoteBlockClient {
session: block::SessionProxy,
common: Common,
}
impl RemoteBlockClient {
pub async fn new(remote: block::BlockProxy) -> Result<Self, Error> {
let info = remote.get_info().await?.map_err(zx::Status::from_raw)?;
let (session, server) = fidl::endpoints::create_proxy()?;
let () = remote.open_session(server)?;
let fifo = session.get_fifo().await?.map_err(zx::Status::from_raw)?;
let fifo = fasync::Fifo::from_fifo(fifo);
let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
let vmo_id = session.attach_vmo(dup).await?.map_err(zx::Status::from_raw)?;
let vmo_id = VmoId::new(vmo_id.id);
Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
}
}
#[async_trait]
impl BlockClient for RemoteBlockClient {
async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, Error> {
let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
let vmo_id = self.session.attach_vmo(dup).await?.map_err(zx::Status::from_raw)?;
Ok(VmoId::new(vmo_id.id))
}
async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), Error> {
self.common.detach_vmo(vmo_id).await
}
async fn read_at(
&self,
buffer_slice: MutableBufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error> {
self.common.read_at(buffer_slice, device_offset).await
}
async fn write_at(
&self,
buffer_slice: BufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error> {
self.common.write_at(buffer_slice, device_offset).await
}
async fn trim(&self, range: Range<u64>) -> Result<(), Error> {
self.common.trim(range).await
}
async fn flush(&self) -> Result<(), Error> {
self.common.flush().await
}
async fn close(&self) -> Result<(), Error> {
let () = self.session.close().await?.map_err(zx::Status::from_raw)?;
Ok(())
}
fn block_size(&self) -> u32 {
self.common.block_size()
}
fn block_count(&self) -> u64 {
self.common.block_count()
}
fn block_flags(&self) -> BlockFlags {
self.common.block_flags()
}
fn is_connected(&self) -> bool {
self.common.is_connected()
}
}
pub struct RemoteBlockClientSync {
session: block::SessionSynchronousProxy,
common: Common,
}
impl RemoteBlockClientSync {
pub fn new(client_end: fidl::endpoints::ClientEnd<block::BlockMarker>) -> Result<Self, Error> {
let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
let info = remote.get_info(zx::Time::INFINITE)?.map_err(zx::Status::from_raw)?;
let (client, server) = fidl::endpoints::create_endpoints();
let () = remote.open_session(server)?;
let session = block::SessionSynchronousProxy::new(client.into_channel());
let fifo = session.get_fifo(zx::Time::INFINITE)?.map_err(zx::Status::from_raw)?;
let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
let vmo_id = session.attach_vmo(dup, zx::Time::INFINITE)?.map_err(zx::Status::from_raw)?;
let vmo_id = VmoId::new(vmo_id.id);
let (sender, receiver) = oneshot::channel::<Result<Self, Error>>();
std::thread::spawn(move || {
let mut executor = fasync::LocalExecutor::new();
let fifo = fasync::Fifo::from_fifo(fifo);
let common = Common::new(fifo, &info, temp_vmo, vmo_id);
let fifo_state = common.fifo_state.clone();
let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
executor.run_singlethreaded(FifoPoller { fifo_state });
});
block_on(receiver)?
}
pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, Error> {
let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
let vmo_id =
self.session.attach_vmo(dup, zx::Time::INFINITE)?.map_err(zx::Status::from_raw)?;
Ok(VmoId::new(vmo_id.id))
}
pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), Error> {
block_on(self.common.detach_vmo(vmo_id))
}
pub fn read_at(
&self,
buffer_slice: MutableBufferSlice<'_>,
device_offset: u64,
) -> Result<(), Error> {
block_on(self.common.read_at(buffer_slice, device_offset))
}
pub fn write_at(&self, buffer_slice: BufferSlice<'_>, device_offset: u64) -> Result<(), Error> {
block_on(self.common.write_at(buffer_slice, device_offset))
}
pub fn flush(&self) -> Result<(), Error> {
block_on(self.common.flush())
}
pub fn close(&self) -> Result<(), Error> {
let () = self.session.close(zx::Time::INFINITE)?.map_err(zx::Status::from_raw)?;
Ok(())
}
pub fn block_size(&self) -> u32 {
self.common.block_size()
}
pub fn block_count(&self) -> u64 {
self.common.block_count()
}
pub fn is_connected(&self) -> bool {
self.common.is_connected()
}
}
impl Drop for RemoteBlockClientSync {
fn drop(&mut self) {
let _ = self.close();
}
}
struct FifoPoller {
fifo_state: FifoStateRef,
}
impl Future for FifoPoller {
type Output = ();
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let mut state_lock = self.fifo_state.lock().unwrap();
let state = state_lock.deref_mut(); if state.poll_send_requests(context) {
return Poll::Ready(());
}
let fifo = state.fifo.as_ref().unwrap(); while let Poll::Ready(result) = fifo.read_one(context) {
match result {
Ok(response) => {
let request_id = response.reqid;
if let Some(request_state) = state.map.get_mut(&request_id) {
request_state.result.replace(zx::Status::from_raw(response.status));
if let Some(waker) = request_state.waker.take() {
waker.wake();
}
}
}
Err(_) => {
state.terminate();
return Poll::Ready(());
}
}
}
state.poller_waker = Some(context.waker().clone());
Poll::Pending
}
}
#[cfg(test)]
mod tests {
use {
super::{
BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
RemoteBlockClient, RemoteBlockClientSync,
},
fidl_fuchsia_hardware_block as block,
fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _},
fuchsia_zircon as zx,
futures::{
future::{AbortHandle, Abortable, TryFutureExt as _},
join,
stream::{futures_unordered::FuturesUnordered, StreamExt as _},
},
ramdevice_client::RamdiskClient,
};
const RAMDISK_BLOCK_SIZE: u64 = 1024;
const RAMDISK_BLOCK_COUNT: u64 = 1024;
pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
.await
.expect("RamdiskClient::create failed");
let client_end = ramdisk.open().await.expect("ramdisk.open failed");
let proxy = client_end.into_proxy().expect("into_proxy failed");
let remote_block_device = RemoteBlockClient::new(proxy).await.expect("new failed");
assert_eq!(remote_block_device.block_size(), 1024);
let client_end = ramdisk.open().await.expect("ramdisk.open failed");
let proxy = client_end.into_proxy().expect("into_proxy failed");
(ramdisk, proxy, remote_block_device)
}
#[fuchsia::test]
async fn test_against_ram_disk() {
let (_ramdisk, block_proxy, remote_block_device) = make_ramdisk().await;
let stats_before = block_proxy
.get_stats(false)
.await
.expect("get_stats failed")
.map_err(zx::Status::from_raw)
.expect("get_stats error");
let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
vmo.write(b"hello", 5).expect("vmo.write failed");
let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed");
remote_block_device
.write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
.await
.expect("write_at failed");
remote_block_device
.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
.await
.expect("read_at failed");
let mut buf: [u8; 5] = Default::default();
vmo.read(&mut buf, 1029).expect("vmo.read failed");
assert_eq!(&buf, b"hello");
remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed");
let stats_after = block_proxy
.get_stats(false)
.await
.expect("get_stats failed")
.map_err(zx::Status::from_raw)
.expect("get_stats error");
assert_eq!(
stats_before.write.success.total_calls + 1,
stats_after.write.success.total_calls
);
assert_eq!(
stats_before.write.success.bytes_transferred + 1024,
stats_after.write.success.bytes_transferred
);
assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
assert_eq!(
stats_before.read.success.bytes_transferred + 2048,
stats_after.read.success.bytes_transferred
);
assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
}
#[fuchsia::test]
async fn test_against_ram_disk_with_flush() {
let (_ramdisk, block_proxy, remote_block_device) = make_ramdisk().await;
let stats_before = block_proxy
.get_stats(false)
.await
.expect("get_stats failed")
.map_err(zx::Status::from_raw)
.expect("get_stats error");
let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
vmo.write(b"hello", 5).expect("vmo.write failed");
let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed");
remote_block_device
.write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
.await
.expect("write_at failed");
remote_block_device.flush().await.expect("flush failed");
remote_block_device
.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
.await
.expect("read_at failed");
let mut buf: [u8; 5] = Default::default();
vmo.read(&mut buf, 1029).expect("vmo.read failed");
assert_eq!(&buf, b"hello");
remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed");
let stats_after = block_proxy
.get_stats(false)
.await
.expect("get_stats failed")
.map_err(zx::Status::from_raw)
.expect("get_stats error");
assert_eq!(
stats_before.write.success.total_calls + 1,
stats_after.write.success.total_calls
);
assert_eq!(
stats_before.write.success.bytes_transferred + 1024,
stats_after.write.success.bytes_transferred
);
assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
assert_eq!(
stats_before.flush.success.total_calls + 1,
stats_after.flush.success.total_calls
);
assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
assert_eq!(
stats_before.read.success.bytes_transferred + 2048,
stats_after.read.success.bytes_transferred
);
assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
}
#[fuchsia::test]
async fn test_alignment() {
let (_ramdisk, _block_proxy, remote_block_device) = make_ramdisk().await;
let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed");
remote_block_device
.write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
.await
.expect_err("expected failure due to bad alignment");
remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed");
}
#[fuchsia::test]
async fn test_parallel_io() {
let (_ramdisk, _block_proxy, remote_block_device) = make_ramdisk().await;
let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed");
let mut reads = Vec::new();
for _ in 0..1024 {
reads.push(
remote_block_device
.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
.inspect_err(|e| panic!("read should have succeeded: {}", e)),
);
}
futures::future::join_all(reads).await;
remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed");
}
#[fuchsia::test]
async fn test_closed_device() {
let (ramdisk, _block_proxy, remote_block_device) = make_ramdisk().await;
let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed");
let mut reads = Vec::new();
for _ in 0..1024 {
reads.push(
remote_block_device
.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
);
}
assert!(remote_block_device.is_connected());
let _ = futures::join!(futures::future::join_all(reads), async {
ramdisk.destroy().await.expect("ramdisk.destroy failed")
});
while remote_block_device
.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
.await
.is_ok()
{}
while remote_block_device.is_connected() {
fasync::Timer::new(fasync::Time::after(zx::Duration::from_millis(500))).await;
}
assert_eq!(remote_block_device.is_connected(), false);
let _ = remote_block_device.detach_vmo(vmo_id).await;
}
#[fuchsia::test]
async fn test_cancelled_reads() {
let (_ramdisk, _block_proxy, remote_block_device) = make_ramdisk().await;
let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
let vmo_id = remote_block_device.attach_vmo(&vmo).await.expect("attach_vmo failed");
{
let mut reads = FuturesUnordered::new();
for _ in 0..1024 {
reads.push(
remote_block_device
.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
);
}
for _ in 0..500 {
reads.next().await;
}
}
remote_block_device.detach_vmo(vmo_id).await.expect("detach_vmo failed");
}
#[fuchsia::test]
async fn test_parallel_large_read_and_write_with_memory_succeds() {
let (_ramdisk, _block_proxy, remote_block_device) = make_ramdisk().await;
let remote_block_device_ref = &remote_block_device;
let test_one = |offset, len, fill| async move {
let buf = vec![fill; len];
remote_block_device_ref
.write_at(buf[..].into(), offset)
.await
.expect("write_at failed");
let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
remote_block_device_ref
.read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
.await
.expect("read_at failed");
assert_eq!(
&read_buf[0..RAMDISK_BLOCK_SIZE as usize],
&[0; RAMDISK_BLOCK_SIZE as usize][..]
);
assert_eq!(
&read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
&buf[..]
);
assert_eq!(
&read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
&[0; RAMDISK_BLOCK_SIZE as usize][..]
);
};
const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
join!(
test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
);
}
struct FakeBlockServer<'a> {
server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
}
impl<'a> FakeBlockServer<'a> {
fn new(
server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
) -> FakeBlockServer<'a> {
FakeBlockServer {
server_channel: Some(server_channel),
channel_handler: Box::new(channel_handler),
fifo_handler: Box::new(fifo_handler),
}
}
async fn run(&mut self) {
let server = self.server_channel.take().unwrap();
let (server_fifo, client_fifo) =
zx::Fifo::create(16, std::mem::size_of::<BlockFifoRequest>())
.expect("Fifo::create failed");
let maybe_server_fifo = std::sync::Mutex::new(Some(client_fifo));
let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
let fifo_future = Abortable::new(
async {
let fifo = fasync::Fifo::from_fifo(server_fifo);
loop {
let request = match fifo.read_entry().await {
Ok(r) => r,
Err(zx::Status::PEER_CLOSED) => break,
Err(e) => panic!("read_entry failed {:?}", e),
};
let response = self.fifo_handler.as_ref()(request);
fifo.write_entries(std::slice::from_ref(&response))
.await
.expect("write_entries failed");
}
},
fifo_future_abort_registration,
);
let channel_future = async {
server
.into_stream()
.expect("into_stream failed")
.for_each_concurrent(None, |request| async {
let request = request.expect("unexpected fidl error");
match request {
block::BlockRequest::GetInfo { responder } => {
responder
.send(Ok(&block::BlockInfo {
block_count: 1024,
block_size: 512,
max_transfer_size: 1024 * 1024,
flags: block::Flag::empty(),
}))
.expect("send failed");
}
block::BlockRequest::OpenSession { session, control_handle: _ } => {
let stream = session.into_stream().expect("into_stream failed");
stream
.for_each(|request| async {
let request = request.expect("unexpected fidl error");
if self.channel_handler.as_ref()(&request) {
return;
}
match request {
block::SessionRequest::GetFifo { responder } => {
match maybe_server_fifo.lock().unwrap().take() {
Some(fifo) => responder.send(Ok(fifo)),
None => responder.send(Err(
zx::Status::NO_RESOURCES.into_raw(),
)),
}
.expect("send failed")
}
block::SessionRequest::AttachVmo {
vmo: _,
responder,
} => responder
.send(Ok(&block::VmoId { id: 1 }))
.expect("send failed"),
block::SessionRequest::Close { responder } => {
fifo_future_abort.abort();
responder.send(Ok(())).expect("send failed")
}
}
})
.await
}
_ => panic!("Unexpected message"),
}
})
.await;
};
let _result = join!(fifo_future, channel_future);
}
}
#[fuchsia::test]
async fn test_block_close_is_called() {
let close_called = std::sync::Mutex::new(false);
let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
std::thread::spawn(move || {
let _remote_block_device =
RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
});
let channel_handler = |request: &block::SessionRequest| -> bool {
if let block::SessionRequest::Close { .. } = request {
*close_called.lock().unwrap() = true;
}
false
};
FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
assert!(*close_called.lock().unwrap());
}
#[fuchsia::test]
async fn test_block_flush_is_called() {
let (proxy, server) = fidl::endpoints::create_proxy().expect("create_proxy failed");
futures::join!(
async {
let remote_block_device = RemoteBlockClient::new(proxy).await.expect("new failed");
remote_block_device.flush().await.expect("flush failed");
},
async {
let flush_called = std::sync::Mutex::new(false);
let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
*flush_called.lock().unwrap() = true;
assert_eq!(request.command.opcode, super::BlockOpcode::Flush.into_primitive());
BlockFifoResponse {
status: zx::Status::OK.into_raw(),
reqid: request.reqid,
..Default::default()
}
};
FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
assert!(*flush_called.lock().unwrap());
}
);
}
#[fuchsia::test]
async fn test_trace_flow_ids_set() {
let (proxy, server) = fidl::endpoints::create_proxy().expect("create_proxy failed");
futures::join!(
async {
let remote_block_device = RemoteBlockClient::new(proxy).await.expect("new failed");
remote_block_device.flush().await.expect("flush failed");
},
async {
let flow_id: std::sync::Mutex<Option<u64>> = std::sync::Mutex::new(None);
let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
if request.trace_flow_id > 0 {
*flow_id.lock().unwrap() = Some(request.trace_flow_id);
}
BlockFifoResponse {
status: zx::Status::OK.into_raw(),
reqid: request.reqid,
..Default::default()
}
};
FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
assert!(flow_id.lock().unwrap().is_some());
}
);
}
}