use super::{
DecodedRequest, GroupOrRequest, IntoSessionManager, Operation, PartitionInfo, SessionHelper,
};
use anyhow::Error;
use block_protocol::{BlockFifoRequest, BlockFifoResponse, WriteOptions};
use fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _};
use futures::stream::StreamExt as _;
use futures::FutureExt;
use std::borrow::Cow;
use std::future::Future;
use std::mem::MaybeUninit;
use std::sync::Arc;
use {fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume};
pub trait Interface: Send + Sync + Unpin + 'static {
fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> impl Future<Output = Result<(), zx::Status>> + Send {
async { Ok(()) }
}
fn on_detach_vmo(&self, _vmo: &zx::Vmo) {}
fn get_info(&self) -> impl Future<Output = Result<Cow<'_, PartitionInfo>, zx::Status>> + Send;
fn read(
&self,
device_block_offset: u64,
block_count: u32,
vmo: &Arc<zx::Vmo>,
vmo_offset: u64, ) -> impl Future<Output = Result<(), zx::Status>> + Send;
fn write(
&self,
device_block_offset: u64,
block_count: u32,
vmo: &Arc<zx::Vmo>,
vmo_offset: u64, opts: WriteOptions,
) -> impl Future<Output = Result<(), zx::Status>> + Send;
fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
fn trim(
&self,
device_block_offset: u64,
block_count: u32,
) -> impl Future<Output = Result<(), zx::Status>> + Send;
fn get_volume_info(
&self,
) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
{
async { Err(zx::Status::NOT_SUPPORTED) }
}
fn query_slices(
&self,
_start_slices: &[u64],
) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
async { Err(zx::Status::NOT_SUPPORTED) }
}
fn extend(
&self,
_start_slice: u64,
_slice_count: u64,
) -> impl Future<Output = Result<(), zx::Status>> + Send {
async { Err(zx::Status::NOT_SUPPORTED) }
}
fn shrink(
&self,
_start_slice: u64,
_slice_count: u64,
) -> impl Future<Output = Result<(), zx::Status>> + Send {
async { Err(zx::Status::NOT_SUPPORTED) }
}
}
pub struct SessionManager<I> {
interface: Arc<I>,
}
impl<I: Interface> SessionManager<I> {
pub fn new(interface: Arc<I>) -> Self {
Self { interface }
}
}
impl<I: Interface> super::SessionManager for SessionManager<I> {
async fn on_attach_vmo(self: Arc<Self>, vmo: &Arc<zx::Vmo>) -> Result<(), zx::Status> {
self.interface.on_attach_vmo(vmo).await
}
async fn open_session(
self: Arc<Self>,
stream: fblock::SessionRequestStream,
block_size: u32,
) -> Result<(), Error> {
let (helper, fifo) = SessionHelper::new(self.clone(), block_size)?;
let helper = Arc::new(helper);
let interface = self.interface.clone();
let mut stream = stream.fuse();
let fifo = Arc::new(fasync::Fifo::from_fifo(fifo));
let scope = fasync::Scope::new();
let scope_ref = scope.clone();
let helper_clone = helper.clone();
let mut fifo_task = scope
.spawn(async move {
let mut requests = [MaybeUninit::<BlockFifoRequest>::uninit(); 64];
while let Ok(count) = fifo.read_entries(&mut requests[..]).await {
for request in &requests[..count] {
if let Some(decoded_request) =
helper.decode_fifo_request(unsafe { request.assume_init_ref() })
{
let interface = interface.clone();
let fifo = fifo.clone();
let helper = helper.clone();
scope_ref.spawn(async move {
let group_or_request = decoded_request.group_or_request;
let status =
process_fifo_request(interface, decoded_request).await.into();
match group_or_request {
GroupOrRequest::Group(group_id) => {
if let Some(response) =
helper.message_groups.complete(group_id, status)
{
if let Err(_) = fifo.write_entries(&response).await {
return;
}
}
}
GroupOrRequest::Request(reqid) => {
if let Err(_) = fifo
.write_entries(&BlockFifoResponse {
status: status.into_raw(),
reqid,
..Default::default()
})
.await
{
return;
}
}
}
});
}
}
}
})
.fuse();
scopeguard::defer! {
for (_, vmo) in helper_clone.take_vmos() {
self.interface.on_detach_vmo(&vmo);
}
}
loop {
futures::select! {
maybe_req = stream.next() => {
if let Some(req) = maybe_req {
helper_clone.handle_request(req?).await?;
} else {
break;
}
}
_ = fifo_task => break,
}
}
Ok(())
}
async fn get_info(&self) -> Result<Cow<'_, PartitionInfo>, zx::Status> {
self.interface.get_info().await
}
async fn get_volume_info(
&self,
) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
self.interface.get_volume_info().await
}
async fn query_slices(
&self,
start_slices: &[u64],
) -> Result<Vec<fvolume::VsliceRange>, zx::Status> {
self.interface.query_slices(start_slices).await
}
async fn extend(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
self.interface.extend(start_slice, slice_count).await
}
async fn shrink(&self, start_slice: u64, slice_count: u64) -> Result<(), zx::Status> {
self.interface.shrink(start_slice, slice_count).await
}
}
impl<I: Interface> IntoSessionManager for Arc<I> {
type SM = SessionManager<I>;
fn into_session_manager(self) -> Arc<Self::SM> {
Arc::new(SessionManager { interface: self })
}
}
async fn process_fifo_request<I: Interface>(
interface: Arc<I>,
r: DecodedRequest,
) -> Result<(), zx::Status> {
match r.operation? {
Operation::Read { device_block_offset, block_count, _unused, vmo_offset } => {
interface
.read(device_block_offset, block_count, &r.vmo.as_ref().unwrap(), vmo_offset)
.await
}
Operation::Write { device_block_offset, block_count, options, vmo_offset } => {
interface
.write(
device_block_offset,
block_count,
&r.vmo.as_ref().unwrap(),
vmo_offset,
options,
)
.await
}
Operation::Flush => interface.flush().await,
Operation::Trim { device_block_offset, block_count } => {
interface.trim(device_block_offset, block_count).await
}
Operation::CloseVmo => {
if let Some(vmo) = &r.vmo {
interface.on_detach_vmo(vmo);
}
Ok(())
}
}
}