block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_async::epoch::{Epoch, EpochGuard};
9use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use storage_device::buffer::Buffer;
19use zx::HandleBased;
20use {
21    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
22    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
23};
24
25pub mod async_interface;
26pub mod c_interface;
27
28#[cfg(test)]
29mod decompression_tests;
30
31pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
32
33type TraceFlowId = Option<NonZero<u64>>;
34
35#[derive(Clone)]
36pub enum DeviceInfo {
37    Block(BlockInfo),
38    Partition(PartitionInfo),
39}
40
41impl DeviceInfo {
42    pub fn device_flags(&self) -> fblock::Flag {
43        match self {
44            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
45            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
46        }
47    }
48
49    pub fn block_count(&self) -> Option<u64> {
50        match self {
51            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
52            Self::Partition(PartitionInfo { block_range, .. }) => {
53                block_range.as_ref().map(|range| range.end - range.start)
54            }
55        }
56    }
57
58    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
59        match self {
60            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
61            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
62                max_transfer_blocks.clone()
63            }
64        }
65    }
66
67    fn max_transfer_size(&self, block_size: u32) -> u32 {
68        if let Some(max_blocks) = self.max_transfer_blocks() {
69            max_blocks.get() * block_size
70        } else {
71            MAX_TRANSFER_UNBOUNDED
72        }
73    }
74}
75
76/// Information associated with non-partition block devices.
77#[derive(Clone, Default)]
78pub struct BlockInfo {
79    pub device_flags: fblock::Flag,
80    pub block_count: u64,
81    pub max_transfer_blocks: Option<NonZero<u32>>,
82}
83
84/// Information associated with a block device that is also a partition.
85#[derive(Clone, Default)]
86pub struct PartitionInfo {
87    /// The device flags reported by the underlying device.
88    pub device_flags: fblock::Flag,
89    pub max_transfer_blocks: Option<NonZero<u32>>,
90    /// If `block_range` is None, the partition is a volume and may not be contiguous.
91    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
92    /// slices and use that (along with the slice and block sizes) to determine the block count.
93    pub block_range: Option<Range<u64>>,
94    pub type_guid: [u8; 16],
95    pub instance_guid: [u8; 16],
96    pub name: String,
97    pub flags: u64,
98}
99
100/// We internally keep track of active requests, so that when the server is torn down, we can
101/// deallocate all of the resources for pending requests.
102struct ActiveRequest<S> {
103    session: S,
104    group_or_request: GroupOrRequest,
105    trace_flow_id: TraceFlowId,
106    _epoch_guard: EpochGuard<'static>,
107    status: zx::Status,
108    count: u32,
109    req_id: Option<u32>,
110    decompression_info: Option<DecompressionInfo>,
111}
112
113struct DecompressionInfo {
114    // This is the range of compressed bytes in receiving buffer.
115    compressed_range: Range<usize>,
116
117    // This is the range in the target VMO where we will write uncompressed bytes.
118    uncompressed_range: Range<u64>,
119
120    bytes_so_far: u64,
121    mapping: Arc<VmoMapping>,
122    buffer: Option<Buffer<'static>>,
123}
124
125impl DecompressionInfo {
126    /// Returns the uncompressed slice.
127    fn uncompressed_slice(&self) -> *mut [u8] {
128        std::ptr::slice_from_raw_parts_mut(
129            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
130            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
131        )
132    }
133}
134
135pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
136
137impl<S> Default for ActiveRequests<S> {
138    fn default() -> Self {
139        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
140    }
141}
142
143impl<S> ActiveRequests<S> {
144    fn complete_and_take_response(
145        &self,
146        request_id: RequestId,
147        status: zx::Status,
148    ) -> Option<(S, BlockFifoResponse)> {
149        self.0.lock().complete_and_take_response(request_id, status)
150    }
151
152    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
153        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
154    }
155}
156
157struct ActiveRequestsInner<S> {
158    requests: Slab<ActiveRequest<S>>,
159}
160
161// Keeps track of all the requests that are currently being processed
162impl<S> ActiveRequestsInner<S> {
163    /// Completes a request.
164    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
165        let group = &mut self.requests[request_id.0];
166
167        group.count = group.count.checked_sub(1).unwrap();
168        if status != zx::Status::OK && group.status == zx::Status::OK {
169            group.status = status
170        }
171
172        fuchsia_trace::duration!(
173            c"storage",
174            c"block_server::finish_transaction",
175            "request_id" => request_id.0,
176            "group_completed" => group.count == 0,
177            "status" => status.into_raw());
178        if let Some(trace_flow_id) = group.trace_flow_id {
179            fuchsia_trace::flow_step!(
180                c"storage",
181                c"block_server::finish_request",
182                trace_flow_id.get().into()
183            );
184        }
185
186        if group.count == 0
187            && group.status == zx::Status::OK
188            && let Some(info) = &mut group.decompression_info
189        {
190            thread_local! {
191                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
192                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
193            }
194            DECOMPRESSOR.with(|decompressor| {
195                // SAFETY: We verified `uncompressed_range` fits within our mapping.
196                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
197                let mut decompressor = decompressor.borrow_mut();
198                if let Err(error) = decompressor.decompress_to_buffer(
199                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
200                    target_slice,
201                ) {
202                    log::warn!(error:?; "Decompression error");
203                    group.status = zx::Status::IO_DATA_INTEGRITY;
204                };
205            });
206        }
207    }
208
209    /// Takes the response if all requests are finished.
210    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
211        let group = &self.requests[request_id.0];
212        match group.req_id {
213            Some(reqid) if group.count == 0 => {
214                let group = self.requests.remove(request_id.0);
215                Some((
216                    group.session,
217                    BlockFifoResponse {
218                        status: group.status.into_raw(),
219                        reqid,
220                        group: group.group_or_request.group_id().unwrap_or(0),
221                        ..Default::default()
222                    },
223                ))
224            }
225            _ => None,
226        }
227    }
228
229    /// Competes the request and returns a response if the request group is finished.
230    fn complete_and_take_response(
231        &mut self,
232        request_id: RequestId,
233        status: zx::Status,
234    ) -> Option<(S, BlockFifoResponse)> {
235        self.complete(request_id, status);
236        self.take_response(request_id)
237    }
238}
239
240/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
241/// cbindgen:no-export
242pub struct BlockServer<SM> {
243    block_size: u32,
244    session_manager: Arc<SM>,
245}
246
247/// A single entry in `[OffsetMap]`.
248#[derive(Debug)]
249pub struct BlockOffsetMapping {
250    source_block_offset: u64,
251    target_block_offset: u64,
252    length: u64,
253}
254
255impl BlockOffsetMapping {
256    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
257        blocks.0 >= self.source_block_offset
258            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
259    }
260}
261
262impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
263    type Error = zx::Status;
264
265    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
266        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
267        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
268        Ok(Self {
269            source_block_offset: wire.source_block_offset,
270            target_block_offset: wire.target_block_offset,
271            length: wire.length,
272        })
273    }
274}
275
276/// Remaps the offset of block requests based on an internal map, and truncates long requests.
277pub struct OffsetMap {
278    mapping: Option<BlockOffsetMapping>,
279    max_transfer_blocks: Option<NonZero<u32>>,
280}
281
282impl OffsetMap {
283    /// An OffsetMap that remaps requests.
284    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
285        Self { mapping: Some(mapping), max_transfer_blocks }
286    }
287
288    /// An OffsetMap that just enforces maximum request sizes.
289    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
290        Self { mapping: None, max_transfer_blocks }
291    }
292
293    pub fn is_empty(&self) -> bool {
294        self.mapping.is_none()
295    }
296
297    fn mapping(&self) -> Option<&BlockOffsetMapping> {
298        self.mapping.as_ref()
299    }
300
301    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
302        self.max_transfer_blocks
303    }
304}
305
306// Methods take Arc<Self> rather than &self because of
307// https://github.com/rust-lang/rust/issues/42940.
308pub trait SessionManager: 'static {
309    const SUPPORTS_DECOMPRESSION: bool;
310
311    type Session;
312
313    fn on_attach_vmo(
314        self: Arc<Self>,
315        vmo: &Arc<zx::Vmo>,
316    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
317
318    /// Creates a new session to handle `stream`.
319    /// The returned future should run until the session completes, for example when the client end
320    /// closes.
321    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
322    fn open_session(
323        self: Arc<Self>,
324        stream: fblock::SessionRequestStream,
325        offset_map: OffsetMap,
326        block_size: u32,
327    ) -> impl Future<Output = Result<(), Error>> + Send;
328
329    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
330    fn get_info(&self) -> Cow<'_, DeviceInfo>;
331
332    /// Called to handle the GetVolumeInfo FIDL call.
333    fn get_volume_info(
334        &self,
335    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
336    {
337        async { Err(zx::Status::NOT_SUPPORTED) }
338    }
339
340    /// Called to handle the QuerySlices FIDL call.
341    fn query_slices(
342        &self,
343        _start_slices: &[u64],
344    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
345        async { Err(zx::Status::NOT_SUPPORTED) }
346    }
347
348    /// Called to handle the Shrink FIDL call.
349    fn extend(
350        &self,
351        _start_slice: u64,
352        _slice_count: u64,
353    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
354        async { Err(zx::Status::NOT_SUPPORTED) }
355    }
356
357    /// Called to handle the Shrink FIDL call.
358    fn shrink(
359        &self,
360        _start_slice: u64,
361        _slice_count: u64,
362    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363        async { Err(zx::Status::NOT_SUPPORTED) }
364    }
365
366    /// Returns the active requests.
367    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
368}
369
370pub trait IntoSessionManager {
371    type SM: SessionManager;
372
373    fn into_session_manager(self) -> Arc<Self::SM>;
374}
375
376impl<SM: SessionManager> BlockServer<SM> {
377    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
378        Self { block_size, session_manager: session_manager.into_session_manager() }
379    }
380
381    pub fn session_manager(&self) -> &SM {
382        self.session_manager.as_ref()
383    }
384
385    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
386    pub async fn handle_requests(
387        &self,
388        mut requests: fvolume::VolumeRequestStream,
389    ) -> Result<(), Error> {
390        let scope = fasync::Scope::new();
391        loop {
392            match requests.try_next().await {
393                Ok(Some(request)) => {
394                    if let Some(session) = self.handle_request(request).await? {
395                        scope.spawn(session.map(|_| ()));
396                    }
397                }
398                Ok(None) => break,
399                Err(err) => log::warn!(err:?; "Invalid request"),
400            }
401        }
402        scope.await;
403        Ok(())
404    }
405
406    /// Processes a partition request.  If a new session task is created in response to the request,
407    /// it is returned.
408    async fn handle_request(
409        &self,
410        request: fvolume::VolumeRequest,
411    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
412        match request {
413            fvolume::VolumeRequest::GetInfo { responder } => {
414                let info = self.device_info();
415                let max_transfer_size = info.max_transfer_size(self.block_size);
416                let (block_count, mut flags) = match info.as_ref() {
417                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
418                        (*block_count, *device_flags)
419                    }
420                    DeviceInfo::Partition(partition_info) => {
421                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
422                            range.end - range.start
423                        } else {
424                            let volume_info = self.session_manager.get_volume_info().await?;
425                            volume_info.0.slice_size * volume_info.1.partition_slice_count
426                                / self.block_size as u64
427                        };
428                        (block_count, partition_info.device_flags)
429                    }
430                };
431                if SM::SUPPORTS_DECOMPRESSION {
432                    flags |= fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT;
433                }
434                responder.send(Ok(&fblock::BlockInfo {
435                    block_count,
436                    block_size: self.block_size,
437                    max_transfer_size,
438                    flags,
439                }))?;
440            }
441            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
442                let info = self.device_info();
443                return Ok(Some(self.session_manager.clone().open_session(
444                    session.into_stream(),
445                    OffsetMap::empty(info.max_transfer_blocks()),
446                    self.block_size,
447                )));
448            }
449            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
450                session,
451                mapping,
452                control_handle: _,
453            } => {
454                let info = self.device_info();
455                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
456                    Ok(m) => m,
457                    Err(status) => {
458                        session.close_with_epitaph(status)?;
459                        return Ok(None);
460                    }
461                };
462                if let Some(max) = info.block_count() {
463                    if initial_mapping.target_block_offset + initial_mapping.length > max {
464                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
465                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
466                        return Ok(None);
467                    }
468                }
469                return Ok(Some(self.session_manager.clone().open_session(
470                    session.into_stream(),
471                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
472                    self.block_size,
473                )));
474            }
475            fvolume::VolumeRequest::GetTypeGuid { responder } => {
476                let info = self.device_info();
477                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
478                    let mut guid =
479                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
480                    guid.value.copy_from_slice(&partition_info.type_guid);
481                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
482                } else {
483                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
484                }
485            }
486            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
487                let info = self.device_info();
488                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
489                    let mut guid =
490                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
491                    guid.value.copy_from_slice(&partition_info.instance_guid);
492                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
493                } else {
494                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
495                }
496            }
497            fvolume::VolumeRequest::GetName { responder } => {
498                let info = self.device_info();
499                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
500                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
501                } else {
502                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
503                }
504            }
505            fvolume::VolumeRequest::GetMetadata { responder } => {
506                let info = self.device_info();
507                if let DeviceInfo::Partition(info) = info.as_ref() {
508                    let mut type_guid =
509                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
510                    type_guid.value.copy_from_slice(&info.type_guid);
511                    let mut instance_guid =
512                        fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
513                    instance_guid.value.copy_from_slice(&info.instance_guid);
514                    responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
515                        name: Some(info.name.clone()),
516                        type_guid: Some(type_guid),
517                        instance_guid: Some(instance_guid),
518                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
519                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
520                        flags: Some(info.flags),
521                        ..Default::default()
522                    }))?;
523                } else {
524                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
525                }
526            }
527            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
528                match self.session_manager.query_slices(&start_slices).await {
529                    Ok(mut results) => {
530                        let results_len = results.len();
531                        assert!(results_len <= 16);
532                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
533                        responder.send(
534                            zx::sys::ZX_OK,
535                            &results.try_into().unwrap(),
536                            results_len as u64,
537                        )?;
538                    }
539                    Err(s) => {
540                        responder.send(
541                            s.into_raw(),
542                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
543                            0,
544                        )?;
545                    }
546                }
547            }
548            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
549                match self.session_manager.get_volume_info().await {
550                    Ok((manager_info, volume_info)) => {
551                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
552                    }
553                    Err(s) => responder.send(s.into_raw(), None, None)?,
554                }
555            }
556            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
557                responder.send(
558                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
559                        .into_raw(),
560                )?;
561            }
562            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
563                responder.send(
564                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
565                        .into_raw(),
566                )?;
567            }
568            fvolume::VolumeRequest::Destroy { responder, .. } => {
569                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
570            }
571        }
572        Ok(None)
573    }
574
575    fn device_info(&self) -> Cow<'_, DeviceInfo> {
576        self.session_manager.get_info()
577    }
578}
579
580struct SessionHelper<SM: SessionManager> {
581    session_manager: Arc<SM>,
582    offset_map: OffsetMap,
583    block_size: u32,
584    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
585    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
586}
587
588struct VmoMapping {
589    base: usize,
590    size: usize,
591}
592
593impl VmoMapping {
594    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
595        let size = vmo.get_size().unwrap() as usize;
596        Ok(Arc::new(Self {
597            base: fuchsia_runtime::vmar_root_self()
598                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
599                .inspect_err(|error| {
600                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
601                })?,
602            size,
603        }))
604    }
605}
606
607impl Drop for VmoMapping {
608    fn drop(&mut self) {
609        // SAFETY: We mapped this in `VmoMapping::new`.
610        unsafe {
611            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
612        }
613    }
614}
615
616impl<SM: SessionManager> SessionHelper<SM> {
617    fn new(
618        session_manager: Arc<SM>,
619        offset_map: OffsetMap,
620        block_size: u32,
621    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
622        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
623        Ok((
624            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
625            fifo,
626        ))
627    }
628
629    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
630        match request {
631            fblock::SessionRequest::GetFifo { responder } => {
632                let rights = zx::Rights::TRANSFER
633                    | zx::Rights::READ
634                    | zx::Rights::WRITE
635                    | zx::Rights::SIGNAL
636                    | zx::Rights::WAIT;
637                match self.peer_fifo.duplicate_handle(rights) {
638                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
639                    Err(s) => responder.send(Err(s.into_raw()))?,
640                }
641                Ok(())
642            }
643            fblock::SessionRequest::AttachVmo { vmo, responder } => {
644                let vmo = Arc::new(vmo);
645                let vmo_id = {
646                    let mut vmos = self.vmos.lock();
647                    if vmos.len() == u16::MAX as usize {
648                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
649                        return Ok(());
650                    } else {
651                        let vmo_id = match vmos.last_entry() {
652                            None => 1,
653                            Some(o) => {
654                                o.key().checked_add(1).unwrap_or_else(|| {
655                                    let mut vmo_id = 1;
656                                    // Find the first gap...
657                                    for (&id, _) in &*vmos {
658                                        if id > vmo_id {
659                                            break;
660                                        }
661                                        vmo_id = id + 1;
662                                    }
663                                    vmo_id
664                                })
665                            }
666                        };
667                        vmos.insert(vmo_id, (vmo.clone(), None));
668                        vmo_id
669                    }
670                };
671                self.session_manager.clone().on_attach_vmo(&vmo).await?;
672                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
673                Ok(())
674            }
675            fblock::SessionRequest::Close { responder } => {
676                responder.send(Ok(()))?;
677                Err(anyhow!("Closed"))
678            }
679        }
680    }
681
682    /// Decodes `request`.
683    fn decode_fifo_request(
684        &self,
685        session: SM::Session,
686        request: &BlockFifoRequest,
687    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
688        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
689
690        let request_bytes = request.length as u64 * self.block_size as u64;
691
692        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
693            .ok_or(zx::Status::INVALID_ARGS)
694            .and_then(|code| {
695                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
696                    if code != BlockOpcode::Read {
697                        return Err(zx::Status::INVALID_ARGS);
698                    }
699                    if !SM::SUPPORTS_DECOMPRESSION {
700                        return Err(zx::Status::NOT_SUPPORTED);
701                    }
702                }
703                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
704                    if request.length == 0 {
705                        return Err(zx::Status::INVALID_ARGS);
706                    }
707                    // Make sure the end offsets won't wrap.
708                    if request.dev_offset.checked_add(request.length as u64).is_none()
709                        || (code != BlockOpcode::Trim
710                            && request_bytes.checked_add(request.vmo_offset).is_none())
711                    {
712                        return Err(zx::Status::OUT_OF_RANGE);
713                    }
714                }
715                Ok(match code {
716                    BlockOpcode::Read => Operation::Read {
717                        device_block_offset: request.dev_offset,
718                        block_count: request.length,
719                        _unused: 0,
720                        vmo_offset: request
721                            .vmo_offset
722                            .checked_mul(self.block_size as u64)
723                            .ok_or(zx::Status::OUT_OF_RANGE)?,
724                        options: ReadOptions {
725                            inline_crypto_options: InlineCryptoOptions {
726                                dun: request.dun,
727                                slot: request.slot,
728                            },
729                        },
730                    },
731                    BlockOpcode::Write => {
732                        let mut options = WriteOptions {
733                            inline_crypto_options: InlineCryptoOptions {
734                                dun: request.dun,
735                                slot: request.slot,
736                            },
737                            ..WriteOptions::default()
738                        };
739                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
740                            options.flags |= WriteFlags::FORCE_ACCESS;
741                        }
742                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
743                            options.flags |= WriteFlags::PRE_BARRIER;
744                        }
745                        Operation::Write {
746                            device_block_offset: request.dev_offset,
747                            block_count: request.length,
748                            _unused: 0,
749                            options,
750                            vmo_offset: request
751                                .vmo_offset
752                                .checked_mul(self.block_size as u64)
753                                .ok_or(zx::Status::OUT_OF_RANGE)?,
754                        }
755                    }
756                    BlockOpcode::Flush => Operation::Flush,
757                    BlockOpcode::Trim => Operation::Trim {
758                        device_block_offset: request.dev_offset,
759                        block_count: request.length,
760                    },
761                    BlockOpcode::CloseVmo => Operation::CloseVmo,
762                })
763            });
764
765        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
766            GroupOrRequest::Group(request.group)
767        } else {
768            GroupOrRequest::Request(request.reqid)
769        };
770
771        let mut active_requests = self.session_manager.active_requests().0.lock();
772        let mut request_id = None;
773
774        // Multiple Block I/O request may be sent as a group.
775        // Notes:
776        // - the group is identified by the group id in the request
777        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
778        //   flag is set.
779        // - when processing a request of a group fails, subsequent requests of that
780        //   group will not be processed.
781        // - decompression is a special case, see block-fifo.h for semantics.
782        //
783        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
784        if group_or_request.is_group() {
785            // Search for an existing entry that matches this group.  NOTE: This is a potentially
786            // expensive way to find a group (it's iterating over all slots in the active-requests
787            // slab).  This can be optimised easily should we need to.
788            for (key, group) in &mut active_requests.requests {
789                if group.group_or_request == group_or_request {
790                    if group.req_id.is_some() {
791                        // We have already received a request tagged as last.
792                        if group.status == zx::Status::OK {
793                            group.status = zx::Status::INVALID_ARGS;
794                        }
795                        // Ignore this request.
796                        return Err(None);
797                    }
798                    // See if this is a continuation of a decompressed read.
799                    if group.status == zx::Status::OK
800                        && let Some(info) = &mut group.decompression_info
801                    {
802                        if let Ok(Operation::Read {
803                            device_block_offset,
804                            mut block_count,
805                            options,
806                            vmo_offset: 0,
807                            ..
808                        }) = operation
809                        {
810                            let remaining_bytes = info
811                                .compressed_range
812                                .end
813                                .next_multiple_of(self.block_size as usize)
814                                as u64
815                                - info.bytes_so_far;
816                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
817                                || request.total_compressed_bytes != 0
818                                || request.uncompressed_bytes != 0
819                                || request.compressed_prefix_bytes != 0
820                                || (flags.contains(BlockIoFlag::GROUP_LAST)
821                                    && info.bytes_so_far + request_bytes
822                                        < info.compressed_range.end as u64)
823                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
824                                    && request_bytes >= remaining_bytes)
825                            {
826                                group.status = zx::Status::INVALID_ARGS;
827                            } else {
828                                // We are tolerant of `block_count` being more than we actually
829                                // need.  This can happen if the client is working with a larger
830                                // block size than the device block size.  For example, if Blobfs
831                                // has a 8192 byte block size, but the device might has a 512 byte
832                                // block size, it can ask for a multiple of 16 blocks, when fewer
833                                // than that might actually be required to hold the compressed data.
834                                // It is easier for us to tolerate this here than to get Blobfs to
835                                // change to pass only the blocks that are required.
836                                if request_bytes > remaining_bytes {
837                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
838                                }
839
840                                operation = Ok(Operation::ContinueDecompressedRead {
841                                    offset: info.bytes_so_far,
842                                    device_block_offset,
843                                    block_count,
844                                    options,
845                                });
846
847                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
848                            }
849                        } else {
850                            group.status = zx::Status::INVALID_ARGS;
851                        }
852                    }
853                    if flags.contains(BlockIoFlag::GROUP_LAST) {
854                        group.req_id = Some(request.reqid);
855                        // If the group has had an error, there is no point trying to issue this
856                        // request.
857                        if group.status != zx::Status::OK {
858                            operation = Err(group.status);
859                        }
860                    } else if group.status != zx::Status::OK {
861                        // The group has already encountered an error, so there is no point trying
862                        // to issue this request.
863                        return Err(None);
864                    }
865                    request_id = Some(RequestId(key));
866                    group.count += 1;
867                    break;
868                }
869            }
870        }
871
872        let is_single_request =
873            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
874
875        let mut decompression_info = None;
876        let vmo = match operation {
877            Ok(Operation::Read {
878                device_block_offset,
879                mut block_count,
880                options,
881                vmo_offset,
882                ..
883            }) => match self.vmos.lock().get_mut(&request.vmoid) {
884                Some((vmo, mapping)) => {
885                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
886                        let compressed_range = request.compressed_prefix_bytes as usize
887                            ..request.compressed_prefix_bytes as usize
888                                + request.total_compressed_bytes as usize;
889                        let required_buffer_size =
890                            compressed_range.end.next_multiple_of(self.block_size as usize);
891
892                        // Validate the initial decompression request.
893                        if compressed_range.start >= compressed_range.end
894                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
895                            || (is_single_request && request_bytes < compressed_range.end as u64)
896                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
897                        {
898                            Err(zx::Status::INVALID_ARGS)
899                        } else {
900                            // We are tolerant of `block_count` being more than we actually need.
901                            // This can happen if the client is working in a larger block size than
902                            // the device block size.  For example, Blobfs has a 8192 byte block
903                            // size, but the device might have a 512 byte block size.  It is easier
904                            // for us to tolerate this here than to get Blobfs to change to pass
905                            // only the blocks that are required.
906                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
907                                block_count =
908                                    (required_buffer_size / self.block_size as usize) as u32;
909                                required_buffer_size as u64
910                            } else {
911                                request_bytes
912                            };
913
914                            // To decompress, we need to have the target VMO mapped (cached).
915                            match mapping {
916                                Some(mapping) => Ok(mapping.clone()),
917                                None => {
918                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
919                                }
920                            }
921                            .and_then(|mapping| {
922                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
923                                // range.
924                                if vmo_offset
925                                    .checked_add(request.uncompressed_bytes as u64)
926                                    .is_some_and(|end| end <= mapping.size as u64)
927                                {
928                                    Ok(mapping)
929                                } else {
930                                    Err(zx::Status::OUT_OF_RANGE)
931                                }
932                            })
933                            .map(|mapping| {
934                                // Convert the operation into a `StartDecompressedRead`
935                                // operation. For non-fragmented requests, this will be the only
936                                // operation, but if it's a fragmented read,
937                                // `ContinueDecompressedRead` operations will follow.
938                                operation = Ok(Operation::StartDecompressedRead {
939                                    required_buffer_size,
940                                    device_block_offset,
941                                    block_count,
942                                    options,
943                                });
944                                // Record sufficient information so that we can decompress when all
945                                // the requests complete.
946                                decompression_info = Some(DecompressionInfo {
947                                    compressed_range,
948                                    bytes_so_far,
949                                    mapping,
950                                    uncompressed_range: vmo_offset
951                                        ..vmo_offset + request.uncompressed_bytes as u64,
952                                    buffer: None,
953                                });
954                                None
955                            })
956                        }
957                    } else {
958                        Ok(Some(vmo.clone()))
959                    }
960                }
961                None => Err(zx::Status::IO),
962            },
963            Ok(Operation::Write { .. }) => self
964                .vmos
965                .lock()
966                .get(&request.vmoid)
967                .cloned()
968                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
969            Ok(Operation::CloseVmo) => {
970                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
971                    let vmo_clone = vmo.clone();
972                    // Make sure the VMO is dropped after all current Epoch guards have been
973                    // dropped.
974                    Epoch::global().defer(move || drop(vmo_clone));
975                    Ok(Some(vmo))
976                })
977            }
978            _ => Ok(None),
979        }
980        .unwrap_or_else(|e| {
981            operation = Err(e);
982            None
983        });
984
985        let trace_flow_id = NonZero::new(request.trace_flow_id);
986        let request_id = request_id.unwrap_or_else(|| {
987            RequestId(active_requests.requests.insert(ActiveRequest {
988                session,
989                group_or_request,
990                trace_flow_id,
991                _epoch_guard: Epoch::global().guard(),
992                status: zx::Status::OK,
993                count: 1,
994                req_id: is_single_request.then_some(request.reqid),
995                decompression_info,
996            }))
997        });
998
999        Ok(DecodedRequest {
1000            request_id,
1001            trace_flow_id,
1002            operation: operation.map_err(|status| {
1003                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1004            })?,
1005            vmo,
1006        })
1007    }
1008
1009    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1010        std::mem::take(&mut *self.vmos.lock())
1011    }
1012
1013    /// Maps the request and returns the mapped request with an optional remainder.
1014    fn map_request(
1015        &self,
1016        mut request: DecodedRequest,
1017        active_request: &mut ActiveRequest<SM::Session>,
1018    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1019        if active_request.status != zx::Status::OK {
1020            return Err(zx::Status::BAD_STATE);
1021        }
1022        let mapping = self.offset_map.mapping();
1023        match (mapping, request.operation.blocks()) {
1024            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1025                return Err(zx::Status::OUT_OF_RANGE);
1026            }
1027            _ => {}
1028        }
1029        let remainder = request.operation.map(
1030            self.offset_map.mapping(),
1031            self.offset_map.max_transfer_blocks(),
1032            self.block_size,
1033        );
1034        if remainder.is_some() {
1035            active_request.count += 1;
1036        }
1037        static CACHE: AtomicU64 = AtomicU64::new(0);
1038        if let Some(context) =
1039            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
1040        {
1041            use fuchsia_trace::ArgValue;
1042            let trace_args = [
1043                ArgValue::of("request_id", request.request_id.0),
1044                ArgValue::of("opcode", request.operation.trace_label()),
1045            ];
1046            let _scope = fuchsia_trace::duration(
1047                c"storage",
1048                c"block_server::start_transaction",
1049                &trace_args,
1050            );
1051            if let Some(trace_flow_id) = active_request.trace_flow_id {
1052                fuchsia_trace::flow_step(
1053                    &context,
1054                    c"block_server::start_transaction",
1055                    trace_flow_id.get().into(),
1056                    &[],
1057                );
1058            }
1059        }
1060        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1061        Ok((request, remainder))
1062    }
1063
1064    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1065        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1066    }
1067}
1068
1069#[repr(transparent)]
1070#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
1071pub struct RequestId(usize);
1072
1073#[derive(Clone, Debug)]
1074struct DecodedRequest {
1075    request_id: RequestId,
1076    trace_flow_id: TraceFlowId,
1077    operation: Operation,
1078    vmo: Option<Arc<zx::Vmo>>,
1079}
1080
1081/// cbindgen:no-export
1082pub type WriteFlags = block_protocol::WriteFlags;
1083pub type WriteOptions = block_protocol::WriteOptions;
1084pub type ReadOptions = block_protocol::ReadOptions;
1085
1086#[repr(C)]
1087#[derive(Clone, Debug, PartialEq, Eq)]
1088pub enum Operation {
1089    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1090    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1091    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1092    // write variant).
1093    Read {
1094        device_block_offset: u64,
1095        block_count: u32,
1096        _unused: u32,
1097        vmo_offset: u64,
1098        options: ReadOptions,
1099    },
1100    Write {
1101        device_block_offset: u64,
1102        block_count: u32,
1103        _unused: u32,
1104        vmo_offset: u64,
1105        options: WriteOptions,
1106    },
1107    Flush,
1108    Trim {
1109        device_block_offset: u64,
1110        block_count: u32,
1111    },
1112    /// This will never be seen by the C interface.
1113    CloseVmo,
1114    StartDecompressedRead {
1115        required_buffer_size: usize,
1116        device_block_offset: u64,
1117        block_count: u32,
1118        options: ReadOptions,
1119    },
1120    ContinueDecompressedRead {
1121        offset: u64,
1122        device_block_offset: u64,
1123        block_count: u32,
1124        options: ReadOptions,
1125    },
1126}
1127
1128impl Operation {
1129    fn trace_label(&self) -> &'static str {
1130        match self {
1131            Operation::Read { .. } => "read",
1132            Operation::Write { .. } => "write",
1133            Operation::Flush { .. } => "flush",
1134            Operation::Trim { .. } => "trim",
1135            Operation::CloseVmo { .. } => "close_vmo",
1136            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1137            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1138        }
1139    }
1140
1141    /// Returns (offset, length).
1142    fn blocks(&self) -> Option<(u64, u32)> {
1143        match self {
1144            Operation::Read { device_block_offset, block_count, .. }
1145            | Operation::Write { device_block_offset, block_count, .. }
1146            | Operation::Trim { device_block_offset, block_count, .. } => {
1147                Some((*device_block_offset, *block_count))
1148            }
1149            _ => None,
1150        }
1151    }
1152
1153    /// Returns mutable references to (offset, length).
1154    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1155        match self {
1156            Operation::Read { device_block_offset, block_count, .. }
1157            | Operation::Write { device_block_offset, block_count, .. }
1158            | Operation::Trim { device_block_offset, block_count, .. } => {
1159                Some((device_block_offset, block_count))
1160            }
1161            _ => None,
1162        }
1163    }
1164
1165    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1166    /// start of the operation.
1167    fn map(
1168        &mut self,
1169        mapping: Option<&BlockOffsetMapping>,
1170        max_blocks: Option<NonZero<u32>>,
1171        block_size: u32,
1172    ) -> Option<Self> {
1173        let mut max = match self {
1174            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1175            _ => None,
1176        };
1177        let (offset, length) = self.blocks_mut()?;
1178        let orig_offset = *offset;
1179        if let Some(mapping) = mapping {
1180            let delta = *offset - mapping.source_block_offset;
1181            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1182            *offset = mapping.target_block_offset + delta;
1183            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1184            max = match max {
1185                None => Some(mapping_max),
1186                Some(m) => Some(std::cmp::min(m, mapping_max)),
1187            };
1188        };
1189        if let Some(max) = max {
1190            if *length as u64 > max {
1191                let rem = (*length as u64 - max) as u32;
1192                *length = max as u32;
1193                return Some(match self {
1194                    Operation::Read {
1195                        device_block_offset: _,
1196                        block_count: _,
1197                        vmo_offset,
1198                        _unused,
1199                        options,
1200                    } => Operation::Read {
1201                        device_block_offset: orig_offset + max,
1202                        block_count: rem,
1203                        vmo_offset: *vmo_offset + max * block_size as u64,
1204                        _unused: *_unused,
1205                        options: *options,
1206                    },
1207                    Operation::Write {
1208                        device_block_offset: _,
1209                        block_count: _,
1210                        _unused,
1211                        vmo_offset,
1212                        options,
1213                    } => Operation::Write {
1214                        device_block_offset: orig_offset + max,
1215                        block_count: rem,
1216                        _unused: *_unused,
1217                        vmo_offset: *vmo_offset + max * block_size as u64,
1218                        options: *options,
1219                    },
1220                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1221                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1222                    }
1223                    _ => unreachable!(),
1224                });
1225            }
1226        }
1227        None
1228    }
1229
1230    /// Returns true if the specified write flags are set.
1231    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1232        if let Operation::Write { options, .. } = self {
1233            options.flags.contains(value)
1234        } else {
1235            false
1236        }
1237    }
1238
1239    /// Removes `value` from the request's write flags and returns true if the flag was set.
1240    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1241        if let Operation::Write { options, .. } = self {
1242            let result = options.flags.contains(value);
1243            options.flags.remove(value);
1244            result
1245        } else {
1246            false
1247        }
1248    }
1249}
1250
1251#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1252pub enum GroupOrRequest {
1253    Group(u16),
1254    Request(u32),
1255}
1256
1257impl GroupOrRequest {
1258    fn is_group(&self) -> bool {
1259        matches!(self, Self::Group(_))
1260    }
1261
1262    fn group_id(&self) -> Option<u16> {
1263        match self {
1264            Self::Group(id) => Some(*id),
1265            Self::Request(_) => None,
1266        }
1267    }
1268}
1269
1270#[cfg(test)]
1271mod tests {
1272    use super::{
1273        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1274        TraceFlowId,
1275    };
1276    use assert_matches::assert_matches;
1277    use block_protocol::{
1278        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, ReadOptions, WriteFlags,
1279        WriteOptions,
1280    };
1281    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1282    use fuchsia_sync::Mutex;
1283    use futures::FutureExt as _;
1284    use futures::channel::oneshot;
1285    use futures::future::BoxFuture;
1286    use std::borrow::Cow;
1287    use std::future::poll_fn;
1288    use std::num::NonZero;
1289    use std::pin::pin;
1290    use std::sync::Arc;
1291    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1292    use std::task::{Context, Poll};
1293    use zx::{AsHandleRef as _, HandleBased as _};
1294    use {
1295        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1296        fuchsia_async as fasync,
1297    };
1298
1299    #[derive(Default)]
1300    struct MockInterface {
1301        read_hook: Option<
1302            Box<
1303                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1304                    + Send
1305                    + Sync,
1306            >,
1307        >,
1308        write_hook:
1309            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1310        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1311    }
1312
1313    impl super::async_interface::Interface for MockInterface {
1314        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1315            Ok(())
1316        }
1317
1318        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1319            Cow::Owned(test_device_info())
1320        }
1321
1322        async fn read(
1323            &self,
1324            device_block_offset: u64,
1325            block_count: u32,
1326            vmo: &Arc<zx::Vmo>,
1327            vmo_offset: u64,
1328            _opts: ReadOptions,
1329            _trace_flow_id: TraceFlowId,
1330        ) -> Result<(), zx::Status> {
1331            if let Some(read_hook) = &self.read_hook {
1332                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1333            } else {
1334                unimplemented!();
1335            }
1336        }
1337
1338        async fn write(
1339            &self,
1340            device_block_offset: u64,
1341            _block_count: u32,
1342            _vmo: &Arc<zx::Vmo>,
1343            _vmo_offset: u64,
1344            opts: WriteOptions,
1345            _trace_flow_id: TraceFlowId,
1346        ) -> Result<(), zx::Status> {
1347            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1348                && let Some(barrier_hook) = &self.barrier_hook
1349            {
1350                barrier_hook()?;
1351            }
1352            if let Some(write_hook) = &self.write_hook {
1353                write_hook(device_block_offset).await
1354            } else {
1355                unimplemented!();
1356            }
1357        }
1358
1359        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1360            Ok(())
1361        }
1362
1363        async fn trim(
1364            &self,
1365            _device_block_offset: u64,
1366            _block_count: u32,
1367            _trace_flow_id: TraceFlowId,
1368        ) -> Result<(), zx::Status> {
1369            unreachable!();
1370        }
1371
1372        async fn get_volume_info(
1373            &self,
1374        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1375            // Hang forever for the test_requests_dont_block_sessions test.
1376            let () = std::future::pending().await;
1377            unreachable!();
1378        }
1379    }
1380
1381    const BLOCK_SIZE: u32 = 512;
1382    const MAX_TRANSFER_BLOCKS: u32 = 10;
1383
1384    fn test_device_info() -> DeviceInfo {
1385        DeviceInfo::Partition(PartitionInfo {
1386            device_flags: fblock::Flag::READONLY
1387                | fblock::Flag::BARRIER_SUPPORT
1388                | fblock::Flag::FUA_SUPPORT,
1389            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1390            block_range: Some(0..100),
1391            type_guid: [1; 16],
1392            instance_guid: [2; 16],
1393            name: "foo".to_string(),
1394            flags: 0xabcd,
1395        })
1396    }
1397
1398    #[fuchsia::test]
1399    async fn test_barriers_ordering() {
1400        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1401        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1402        let barrier_called = Arc::new(AtomicBool::new(false));
1403
1404        futures::join!(
1405            async move {
1406                let barrier_called_clone = barrier_called.clone();
1407                let block_server = BlockServer::new(
1408                    BLOCK_SIZE,
1409                    Arc::new(MockInterface {
1410                        barrier_hook: Some(Box::new(move || {
1411                            barrier_called.store(true, Ordering::Relaxed);
1412                            Ok(())
1413                        })),
1414                        write_hook: Some(Box::new(move |device_block_offset| {
1415                            let barrier_called = barrier_called_clone.clone();
1416                            Box::pin(async move {
1417                                // The sleep allows the server to reorder the fifo requests.
1418                                if device_block_offset % 2 == 0 {
1419                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1420                                        zx::MonotonicDuration::from_millis(200),
1421                                    ))
1422                                    .await;
1423                                }
1424                                assert!(barrier_called.load(Ordering::Relaxed));
1425                                Ok(())
1426                            })
1427                        })),
1428                        ..MockInterface::default()
1429                    }),
1430                );
1431                block_server.handle_requests(stream).await.unwrap();
1432            },
1433            async move {
1434                let (session_proxy, server) = fidl::endpoints::create_proxy();
1435
1436                proxy.open_session(server).unwrap();
1437
1438                let vmo_id = session_proxy
1439                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1440                    .await
1441                    .unwrap()
1442                    .unwrap();
1443                assert_ne!(vmo_id.id, 0);
1444
1445                let mut fifo =
1446                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1447                let (mut reader, mut writer) = fifo.async_io();
1448
1449                writer
1450                    .write_entries(&BlockFifoRequest {
1451                        command: BlockFifoCommand {
1452                            opcode: BlockOpcode::Write.into_primitive(),
1453                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1454                            ..Default::default()
1455                        },
1456                        vmoid: vmo_id.id,
1457                        dev_offset: 0,
1458                        length: 5,
1459                        vmo_offset: 6,
1460                        ..Default::default()
1461                    })
1462                    .await
1463                    .unwrap();
1464
1465                for i in 0..10 {
1466                    writer
1467                        .write_entries(&BlockFifoRequest {
1468                            command: BlockFifoCommand {
1469                                opcode: BlockOpcode::Write.into_primitive(),
1470                                ..Default::default()
1471                            },
1472                            vmoid: vmo_id.id,
1473                            dev_offset: i + 1,
1474                            length: 5,
1475                            vmo_offset: 6,
1476                            ..Default::default()
1477                        })
1478                        .await
1479                        .unwrap();
1480                }
1481                for _ in 0..11 {
1482                    let mut response = BlockFifoResponse::default();
1483                    reader.read_entries(&mut response).await.unwrap();
1484                    assert_eq!(response.status, zx::sys::ZX_OK);
1485                }
1486
1487                std::mem::drop(proxy);
1488            }
1489        );
1490    }
1491
1492    #[fuchsia::test]
1493    async fn test_info() {
1494        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1495
1496        futures::join!(
1497            async {
1498                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1499                block_server.handle_requests(stream).await.unwrap();
1500            },
1501            async {
1502                let expected_info = test_device_info();
1503                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1504                    info
1505                } else {
1506                    unreachable!()
1507                };
1508
1509                let block_info = proxy.get_info().await.unwrap().unwrap();
1510                assert_eq!(
1511                    block_info.block_count,
1512                    partition_info.block_range.as_ref().unwrap().end
1513                        - partition_info.block_range.as_ref().unwrap().start
1514                );
1515                assert_eq!(
1516                    block_info.flags,
1517                    fblock::Flag::READONLY
1518                        | fblock::Flag::ZSTD_DECOMPRESSION_SUPPORT
1519                        | fblock::Flag::BARRIER_SUPPORT
1520                        | fblock::Flag::FUA_SUPPORT
1521                );
1522
1523                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1524
1525                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1526                assert_eq!(status, zx::sys::ZX_OK);
1527                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1528
1529                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1530                assert_eq!(status, zx::sys::ZX_OK);
1531                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1532
1533                let (status, name) = proxy.get_name().await.unwrap();
1534                assert_eq!(status, zx::sys::ZX_OK);
1535                assert_eq!(name.as_ref(), Some(&partition_info.name));
1536
1537                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1538                assert_eq!(metadata.name, name);
1539                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1540                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1541                assert_eq!(
1542                    metadata.start_block_offset,
1543                    Some(partition_info.block_range.as_ref().unwrap().start)
1544                );
1545                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1546                assert_eq!(metadata.flags, Some(partition_info.flags));
1547
1548                std::mem::drop(proxy);
1549            }
1550        );
1551    }
1552
1553    #[fuchsia::test]
1554    async fn test_attach_vmo() {
1555        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1556
1557        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1558        let koid = vmo.get_koid().unwrap();
1559
1560        futures::join!(
1561            async {
1562                let block_server = BlockServer::new(
1563                    BLOCK_SIZE,
1564                    Arc::new(MockInterface {
1565                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1566                            assert_eq!(vmo.get_koid().unwrap(), koid);
1567                            Box::pin(async { Ok(()) })
1568                        })),
1569                        ..MockInterface::default()
1570                    }),
1571                );
1572                block_server.handle_requests(stream).await.unwrap();
1573            },
1574            async move {
1575                let (session_proxy, server) = fidl::endpoints::create_proxy();
1576
1577                proxy.open_session(server).unwrap();
1578
1579                let vmo_id = session_proxy
1580                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1581                    .await
1582                    .unwrap()
1583                    .unwrap();
1584                assert_ne!(vmo_id.id, 0);
1585
1586                let mut fifo =
1587                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1588                let (mut reader, mut writer) = fifo.async_io();
1589
1590                // Keep attaching VMOs until we eventually hit the maximum.
1591                let mut count = 1;
1592                loop {
1593                    match session_proxy
1594                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1595                        .await
1596                        .unwrap()
1597                    {
1598                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1599                        Err(e) => {
1600                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1601                            break;
1602                        }
1603                    }
1604
1605                    // Only test every 10 to keep test time down.
1606                    if count % 10 == 0 {
1607                        writer
1608                            .write_entries(&BlockFifoRequest {
1609                                command: BlockFifoCommand {
1610                                    opcode: BlockOpcode::Read.into_primitive(),
1611                                    ..Default::default()
1612                                },
1613                                vmoid: vmo_id.id,
1614                                length: 1,
1615                                ..Default::default()
1616                            })
1617                            .await
1618                            .unwrap();
1619
1620                        let mut response = BlockFifoResponse::default();
1621                        reader.read_entries(&mut response).await.unwrap();
1622                        assert_eq!(response.status, zx::sys::ZX_OK);
1623                    }
1624
1625                    count += 1;
1626                }
1627
1628                assert_eq!(count, u16::MAX as u64);
1629
1630                // Detach the original VMO, and make sure we can then attach another one.
1631                writer
1632                    .write_entries(&BlockFifoRequest {
1633                        command: BlockFifoCommand {
1634                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1635                            ..Default::default()
1636                        },
1637                        vmoid: vmo_id.id,
1638                        ..Default::default()
1639                    })
1640                    .await
1641                    .unwrap();
1642
1643                let mut response = BlockFifoResponse::default();
1644                reader.read_entries(&mut response).await.unwrap();
1645                assert_eq!(response.status, zx::sys::ZX_OK);
1646
1647                let new_vmo_id = session_proxy
1648                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1649                    .await
1650                    .unwrap()
1651                    .unwrap();
1652                // It should reuse the same ID.
1653                assert_eq!(new_vmo_id.id, vmo_id.id);
1654
1655                std::mem::drop(proxy);
1656            }
1657        );
1658    }
1659
1660    #[fuchsia::test]
1661    async fn test_close() {
1662        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1663
1664        let mut server = std::pin::pin!(
1665            async {
1666                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1667                block_server.handle_requests(stream).await.unwrap();
1668            }
1669            .fuse()
1670        );
1671
1672        let mut client = std::pin::pin!(
1673            async {
1674                let (session_proxy, server) = fidl::endpoints::create_proxy();
1675
1676                proxy.open_session(server).unwrap();
1677
1678                // Dropping the proxy should not cause the session to terminate because the session is
1679                // still live.
1680                std::mem::drop(proxy);
1681
1682                session_proxy.close().await.unwrap().unwrap();
1683
1684                // Keep the session alive.  Calling `close` should cause the server to terminate.
1685                let _: () = std::future::pending().await;
1686            }
1687            .fuse()
1688        );
1689
1690        futures::select!(
1691            _ = server => {}
1692            _ = client => unreachable!(),
1693        );
1694    }
1695
1696    #[derive(Default)]
1697    struct IoMockInterface {
1698        do_checks: bool,
1699        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1700        return_errors: bool,
1701    }
1702
1703    #[derive(Debug)]
1704    enum ExpectedOp {
1705        Read(u64, u32, u64),
1706        Write(u64, u32, u64),
1707        Trim(u64, u32),
1708        Flush,
1709    }
1710
1711    impl super::async_interface::Interface for IoMockInterface {
1712        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1713            Ok(())
1714        }
1715
1716        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1717            Cow::Owned(test_device_info())
1718        }
1719
1720        async fn read(
1721            &self,
1722            device_block_offset: u64,
1723            block_count: u32,
1724            _vmo: &Arc<zx::Vmo>,
1725            vmo_offset: u64,
1726            _opts: ReadOptions,
1727            _trace_flow_id: TraceFlowId,
1728        ) -> Result<(), zx::Status> {
1729            if self.return_errors {
1730                Err(zx::Status::INTERNAL)
1731            } else {
1732                if self.do_checks {
1733                    assert_matches!(
1734                        self.expected_op.lock().take(),
1735                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1736                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1737                        "Read {device_block_offset} {block_count} {vmo_offset}"
1738                    );
1739                }
1740                Ok(())
1741            }
1742        }
1743
1744        async fn write(
1745            &self,
1746            device_block_offset: u64,
1747            block_count: u32,
1748            _vmo: &Arc<zx::Vmo>,
1749            vmo_offset: u64,
1750            _write_opts: WriteOptions,
1751            _trace_flow_id: TraceFlowId,
1752        ) -> Result<(), zx::Status> {
1753            if self.return_errors {
1754                Err(zx::Status::NOT_SUPPORTED)
1755            } else {
1756                if self.do_checks {
1757                    assert_matches!(
1758                        self.expected_op.lock().take(),
1759                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1760                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1761                        "Write {device_block_offset} {block_count} {vmo_offset}"
1762                    );
1763                }
1764                Ok(())
1765            }
1766        }
1767
1768        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1769            if self.return_errors {
1770                Err(zx::Status::NO_RESOURCES)
1771            } else {
1772                if self.do_checks {
1773                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1774                }
1775                Ok(())
1776            }
1777        }
1778
1779        async fn trim(
1780            &self,
1781            device_block_offset: u64,
1782            block_count: u32,
1783            _trace_flow_id: TraceFlowId,
1784        ) -> Result<(), zx::Status> {
1785            if self.return_errors {
1786                Err(zx::Status::NO_MEMORY)
1787            } else {
1788                if self.do_checks {
1789                    assert_matches!(
1790                        self.expected_op.lock().take(),
1791                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1792                            block_count == b,
1793                        "Trim {device_block_offset} {block_count}"
1794                    );
1795                }
1796                Ok(())
1797            }
1798        }
1799    }
1800
1801    #[fuchsia::test]
1802    async fn test_io() {
1803        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1804
1805        let expected_op = Arc::new(Mutex::new(None));
1806        let expected_op_clone = expected_op.clone();
1807
1808        let server = async {
1809            let block_server = BlockServer::new(
1810                BLOCK_SIZE,
1811                Arc::new(IoMockInterface {
1812                    return_errors: false,
1813                    do_checks: true,
1814                    expected_op: expected_op_clone,
1815                }),
1816            );
1817            block_server.handle_requests(stream).await.unwrap();
1818        };
1819
1820        let client = async move {
1821            let (session_proxy, server) = fidl::endpoints::create_proxy();
1822
1823            proxy.open_session(server).unwrap();
1824
1825            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1826            let vmo_id = session_proxy
1827                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1828                .await
1829                .unwrap()
1830                .unwrap();
1831
1832            let mut fifo =
1833                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1834            let (mut reader, mut writer) = fifo.async_io();
1835
1836            // READ
1837            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1838            writer
1839                .write_entries(&BlockFifoRequest {
1840                    command: BlockFifoCommand {
1841                        opcode: BlockOpcode::Read.into_primitive(),
1842                        ..Default::default()
1843                    },
1844                    vmoid: vmo_id.id,
1845                    dev_offset: 1,
1846                    length: 2,
1847                    vmo_offset: 3,
1848                    ..Default::default()
1849                })
1850                .await
1851                .unwrap();
1852
1853            let mut response = BlockFifoResponse::default();
1854            reader.read_entries(&mut response).await.unwrap();
1855            assert_eq!(response.status, zx::sys::ZX_OK);
1856
1857            // WRITE
1858            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1859            writer
1860                .write_entries(&BlockFifoRequest {
1861                    command: BlockFifoCommand {
1862                        opcode: BlockOpcode::Write.into_primitive(),
1863                        ..Default::default()
1864                    },
1865                    vmoid: vmo_id.id,
1866                    dev_offset: 4,
1867                    length: 5,
1868                    vmo_offset: 6,
1869                    ..Default::default()
1870                })
1871                .await
1872                .unwrap();
1873
1874            let mut response = BlockFifoResponse::default();
1875            reader.read_entries(&mut response).await.unwrap();
1876            assert_eq!(response.status, zx::sys::ZX_OK);
1877
1878            // FLUSH
1879            *expected_op.lock() = Some(ExpectedOp::Flush);
1880            writer
1881                .write_entries(&BlockFifoRequest {
1882                    command: BlockFifoCommand {
1883                        opcode: BlockOpcode::Flush.into_primitive(),
1884                        ..Default::default()
1885                    },
1886                    ..Default::default()
1887                })
1888                .await
1889                .unwrap();
1890
1891            reader.read_entries(&mut response).await.unwrap();
1892            assert_eq!(response.status, zx::sys::ZX_OK);
1893
1894            // TRIM
1895            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1896            writer
1897                .write_entries(&BlockFifoRequest {
1898                    command: BlockFifoCommand {
1899                        opcode: BlockOpcode::Trim.into_primitive(),
1900                        ..Default::default()
1901                    },
1902                    dev_offset: 7,
1903                    length: 8,
1904                    ..Default::default()
1905                })
1906                .await
1907                .unwrap();
1908
1909            reader.read_entries(&mut response).await.unwrap();
1910            assert_eq!(response.status, zx::sys::ZX_OK);
1911
1912            std::mem::drop(proxy);
1913        };
1914
1915        futures::join!(server, client);
1916    }
1917
1918    #[fuchsia::test]
1919    async fn test_io_errors() {
1920        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1921
1922        futures::join!(
1923            async {
1924                let block_server = BlockServer::new(
1925                    BLOCK_SIZE,
1926                    Arc::new(IoMockInterface {
1927                        return_errors: true,
1928                        do_checks: false,
1929                        expected_op: Arc::new(Mutex::new(None)),
1930                    }),
1931                );
1932                block_server.handle_requests(stream).await.unwrap();
1933            },
1934            async move {
1935                let (session_proxy, server) = fidl::endpoints::create_proxy();
1936
1937                proxy.open_session(server).unwrap();
1938
1939                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1940                let vmo_id = session_proxy
1941                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1942                    .await
1943                    .unwrap()
1944                    .unwrap();
1945
1946                let mut fifo =
1947                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1948                let (mut reader, mut writer) = fifo.async_io();
1949
1950                // READ
1951                writer
1952                    .write_entries(&BlockFifoRequest {
1953                        command: BlockFifoCommand {
1954                            opcode: BlockOpcode::Read.into_primitive(),
1955                            ..Default::default()
1956                        },
1957                        vmoid: vmo_id.id,
1958                        length: 1,
1959                        reqid: 1,
1960                        ..Default::default()
1961                    })
1962                    .await
1963                    .unwrap();
1964
1965                let mut response = BlockFifoResponse::default();
1966                reader.read_entries(&mut response).await.unwrap();
1967                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1968
1969                // WRITE
1970                writer
1971                    .write_entries(&BlockFifoRequest {
1972                        command: BlockFifoCommand {
1973                            opcode: BlockOpcode::Write.into_primitive(),
1974                            ..Default::default()
1975                        },
1976                        vmoid: vmo_id.id,
1977                        length: 1,
1978                        reqid: 2,
1979                        ..Default::default()
1980                    })
1981                    .await
1982                    .unwrap();
1983
1984                reader.read_entries(&mut response).await.unwrap();
1985                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1986
1987                // FLUSH
1988                writer
1989                    .write_entries(&BlockFifoRequest {
1990                        command: BlockFifoCommand {
1991                            opcode: BlockOpcode::Flush.into_primitive(),
1992                            ..Default::default()
1993                        },
1994                        reqid: 3,
1995                        ..Default::default()
1996                    })
1997                    .await
1998                    .unwrap();
1999
2000                reader.read_entries(&mut response).await.unwrap();
2001                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2002
2003                // TRIM
2004                writer
2005                    .write_entries(&BlockFifoRequest {
2006                        command: BlockFifoCommand {
2007                            opcode: BlockOpcode::Trim.into_primitive(),
2008                            ..Default::default()
2009                        },
2010                        reqid: 4,
2011                        length: 1,
2012                        ..Default::default()
2013                    })
2014                    .await
2015                    .unwrap();
2016
2017                reader.read_entries(&mut response).await.unwrap();
2018                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2019
2020                std::mem::drop(proxy);
2021            }
2022        );
2023    }
2024
2025    #[fuchsia::test]
2026    async fn test_invalid_args() {
2027        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2028
2029        futures::join!(
2030            async {
2031                let block_server = BlockServer::new(
2032                    BLOCK_SIZE,
2033                    Arc::new(IoMockInterface {
2034                        return_errors: false,
2035                        do_checks: false,
2036                        expected_op: Arc::new(Mutex::new(None)),
2037                    }),
2038                );
2039                block_server.handle_requests(stream).await.unwrap();
2040            },
2041            async move {
2042                let (session_proxy, server) = fidl::endpoints::create_proxy();
2043
2044                proxy.open_session(server).unwrap();
2045
2046                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2047                let vmo_id = session_proxy
2048                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2049                    .await
2050                    .unwrap()
2051                    .unwrap();
2052
2053                let mut fifo =
2054                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2055
2056                async fn test(
2057                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2058                    request: BlockFifoRequest,
2059                ) -> Result<(), zx::Status> {
2060                    let (mut reader, mut writer) = fifo.async_io();
2061                    writer.write_entries(&request).await.unwrap();
2062                    let mut response = BlockFifoResponse::default();
2063                    reader.read_entries(&mut response).await.unwrap();
2064                    zx::Status::ok(response.status)
2065                }
2066
2067                // READ
2068
2069                let good_read_request = || BlockFifoRequest {
2070                    command: BlockFifoCommand {
2071                        opcode: BlockOpcode::Read.into_primitive(),
2072                        ..Default::default()
2073                    },
2074                    length: 1,
2075                    vmoid: vmo_id.id,
2076                    ..Default::default()
2077                };
2078
2079                assert_eq!(
2080                    test(
2081                        &mut fifo,
2082                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2083                    )
2084                    .await,
2085                    Err(zx::Status::IO)
2086                );
2087
2088                assert_eq!(
2089                    test(
2090                        &mut fifo,
2091                        BlockFifoRequest {
2092                            vmo_offset: 0xffff_ffff_ffff_ffff,
2093                            ..good_read_request()
2094                        }
2095                    )
2096                    .await,
2097                    Err(zx::Status::OUT_OF_RANGE)
2098                );
2099
2100                assert_eq!(
2101                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2102                    Err(zx::Status::INVALID_ARGS)
2103                );
2104
2105                // WRITE
2106
2107                let good_write_request = || BlockFifoRequest {
2108                    command: BlockFifoCommand {
2109                        opcode: BlockOpcode::Write.into_primitive(),
2110                        ..Default::default()
2111                    },
2112                    length: 1,
2113                    vmoid: vmo_id.id,
2114                    ..Default::default()
2115                };
2116
2117                assert_eq!(
2118                    test(
2119                        &mut fifo,
2120                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2121                    )
2122                    .await,
2123                    Err(zx::Status::IO)
2124                );
2125
2126                assert_eq!(
2127                    test(
2128                        &mut fifo,
2129                        BlockFifoRequest {
2130                            vmo_offset: 0xffff_ffff_ffff_ffff,
2131                            ..good_write_request()
2132                        }
2133                    )
2134                    .await,
2135                    Err(zx::Status::OUT_OF_RANGE)
2136                );
2137
2138                assert_eq!(
2139                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2140                    Err(zx::Status::INVALID_ARGS)
2141                );
2142
2143                // CLOSE VMO
2144
2145                assert_eq!(
2146                    test(
2147                        &mut fifo,
2148                        BlockFifoRequest {
2149                            command: BlockFifoCommand {
2150                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2151                                ..Default::default()
2152                            },
2153                            vmoid: vmo_id.id + 1,
2154                            ..Default::default()
2155                        }
2156                    )
2157                    .await,
2158                    Err(zx::Status::IO)
2159                );
2160
2161                std::mem::drop(proxy);
2162            }
2163        );
2164    }
2165
2166    #[fuchsia::test]
2167    async fn test_concurrent_requests() {
2168        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2169
2170        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2171        let waiting_readers_clone = waiting_readers.clone();
2172
2173        futures::join!(
2174            async move {
2175                let block_server = BlockServer::new(
2176                    BLOCK_SIZE,
2177                    Arc::new(MockInterface {
2178                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2179                            let (tx, rx) = oneshot::channel();
2180                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2181                            Box::pin(async move {
2182                                let _ = rx.await;
2183                                Ok(())
2184                            })
2185                        })),
2186                        ..MockInterface::default()
2187                    }),
2188                );
2189                block_server.handle_requests(stream).await.unwrap();
2190            },
2191            async move {
2192                let (session_proxy, server) = fidl::endpoints::create_proxy();
2193
2194                proxy.open_session(server).unwrap();
2195
2196                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2197                let vmo_id = session_proxy
2198                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2199                    .await
2200                    .unwrap()
2201                    .unwrap();
2202
2203                let mut fifo =
2204                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2205                let (mut reader, mut writer) = fifo.async_io();
2206
2207                writer
2208                    .write_entries(&BlockFifoRequest {
2209                        command: BlockFifoCommand {
2210                            opcode: BlockOpcode::Read.into_primitive(),
2211                            ..Default::default()
2212                        },
2213                        reqid: 1,
2214                        dev_offset: 1, // Intentionally use the same as `reqid`.
2215                        vmoid: vmo_id.id,
2216                        length: 1,
2217                        ..Default::default()
2218                    })
2219                    .await
2220                    .unwrap();
2221
2222                writer
2223                    .write_entries(&BlockFifoRequest {
2224                        command: BlockFifoCommand {
2225                            opcode: BlockOpcode::Read.into_primitive(),
2226                            ..Default::default()
2227                        },
2228                        reqid: 2,
2229                        dev_offset: 2,
2230                        vmoid: vmo_id.id,
2231                        length: 1,
2232                        ..Default::default()
2233                    })
2234                    .await
2235                    .unwrap();
2236
2237                // Wait till both those entries are pending.
2238                poll_fn(|cx: &mut Context<'_>| {
2239                    if waiting_readers.lock().len() == 2 {
2240                        Poll::Ready(())
2241                    } else {
2242                        // Yield to the executor.
2243                        cx.waker().wake_by_ref();
2244                        Poll::Pending
2245                    }
2246                })
2247                .await;
2248
2249                let mut response = BlockFifoResponse::default();
2250                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2251
2252                let (id, tx) = waiting_readers.lock().pop().unwrap();
2253                tx.send(()).unwrap();
2254
2255                reader.read_entries(&mut response).await.unwrap();
2256                assert_eq!(response.status, zx::sys::ZX_OK);
2257                assert_eq!(response.reqid, id);
2258
2259                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2260
2261                let (id, tx) = waiting_readers.lock().pop().unwrap();
2262                tx.send(()).unwrap();
2263
2264                reader.read_entries(&mut response).await.unwrap();
2265                assert_eq!(response.status, zx::sys::ZX_OK);
2266                assert_eq!(response.reqid, id);
2267            }
2268        );
2269    }
2270
2271    #[fuchsia::test]
2272    async fn test_groups() {
2273        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2274
2275        futures::join!(
2276            async move {
2277                let block_server = BlockServer::new(
2278                    BLOCK_SIZE,
2279                    Arc::new(MockInterface {
2280                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2281                        ..MockInterface::default()
2282                    }),
2283                );
2284                block_server.handle_requests(stream).await.unwrap();
2285            },
2286            async move {
2287                let (session_proxy, server) = fidl::endpoints::create_proxy();
2288
2289                proxy.open_session(server).unwrap();
2290
2291                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2292                let vmo_id = session_proxy
2293                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2294                    .await
2295                    .unwrap()
2296                    .unwrap();
2297
2298                let mut fifo =
2299                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2300                let (mut reader, mut writer) = fifo.async_io();
2301
2302                writer
2303                    .write_entries(&BlockFifoRequest {
2304                        command: BlockFifoCommand {
2305                            opcode: BlockOpcode::Read.into_primitive(),
2306                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2307                            ..Default::default()
2308                        },
2309                        group: 1,
2310                        vmoid: vmo_id.id,
2311                        length: 1,
2312                        ..Default::default()
2313                    })
2314                    .await
2315                    .unwrap();
2316
2317                writer
2318                    .write_entries(&BlockFifoRequest {
2319                        command: BlockFifoCommand {
2320                            opcode: BlockOpcode::Read.into_primitive(),
2321                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2322                            ..Default::default()
2323                        },
2324                        reqid: 2,
2325                        group: 1,
2326                        vmoid: vmo_id.id,
2327                        length: 1,
2328                        ..Default::default()
2329                    })
2330                    .await
2331                    .unwrap();
2332
2333                let mut response = BlockFifoResponse::default();
2334                reader.read_entries(&mut response).await.unwrap();
2335                assert_eq!(response.status, zx::sys::ZX_OK);
2336                assert_eq!(response.reqid, 2);
2337                assert_eq!(response.group, 1);
2338            }
2339        );
2340    }
2341
2342    #[fuchsia::test]
2343    async fn test_group_error() {
2344        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2345
2346        let counter = Arc::new(AtomicU64::new(0));
2347        let counter_clone = counter.clone();
2348
2349        futures::join!(
2350            async move {
2351                let block_server = BlockServer::new(
2352                    BLOCK_SIZE,
2353                    Arc::new(MockInterface {
2354                        read_hook: Some(Box::new(move |_, _, _, _| {
2355                            counter_clone.fetch_add(1, Ordering::Relaxed);
2356                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2357                        })),
2358                        ..MockInterface::default()
2359                    }),
2360                );
2361                block_server.handle_requests(stream).await.unwrap();
2362            },
2363            async move {
2364                let (session_proxy, server) = fidl::endpoints::create_proxy();
2365
2366                proxy.open_session(server).unwrap();
2367
2368                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2369                let vmo_id = session_proxy
2370                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2371                    .await
2372                    .unwrap()
2373                    .unwrap();
2374
2375                let mut fifo =
2376                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2377                let (mut reader, mut writer) = fifo.async_io();
2378
2379                writer
2380                    .write_entries(&BlockFifoRequest {
2381                        command: BlockFifoCommand {
2382                            opcode: BlockOpcode::Read.into_primitive(),
2383                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2384                            ..Default::default()
2385                        },
2386                        group: 1,
2387                        vmoid: vmo_id.id,
2388                        length: 1,
2389                        ..Default::default()
2390                    })
2391                    .await
2392                    .unwrap();
2393
2394                // Wait until processed.
2395                poll_fn(|cx: &mut Context<'_>| {
2396                    if counter.load(Ordering::Relaxed) == 1 {
2397                        Poll::Ready(())
2398                    } else {
2399                        // Yield to the executor.
2400                        cx.waker().wake_by_ref();
2401                        Poll::Pending
2402                    }
2403                })
2404                .await;
2405
2406                let mut response = BlockFifoResponse::default();
2407                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2408
2409                writer
2410                    .write_entries(&BlockFifoRequest {
2411                        command: BlockFifoCommand {
2412                            opcode: BlockOpcode::Read.into_primitive(),
2413                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2414                            ..Default::default()
2415                        },
2416                        group: 1,
2417                        vmoid: vmo_id.id,
2418                        length: 1,
2419                        ..Default::default()
2420                    })
2421                    .await
2422                    .unwrap();
2423
2424                writer
2425                    .write_entries(&BlockFifoRequest {
2426                        command: BlockFifoCommand {
2427                            opcode: BlockOpcode::Read.into_primitive(),
2428                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2429                            ..Default::default()
2430                        },
2431                        reqid: 2,
2432                        group: 1,
2433                        vmoid: vmo_id.id,
2434                        length: 1,
2435                        ..Default::default()
2436                    })
2437                    .await
2438                    .unwrap();
2439
2440                reader.read_entries(&mut response).await.unwrap();
2441                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2442                assert_eq!(response.reqid, 2);
2443                assert_eq!(response.group, 1);
2444
2445                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2446
2447                // Only the first request should have been processed.
2448                assert_eq!(counter.load(Ordering::Relaxed), 1);
2449            }
2450        );
2451    }
2452
2453    #[fuchsia::test]
2454    async fn test_group_with_two_lasts() {
2455        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2456
2457        let (tx, rx) = oneshot::channel();
2458
2459        futures::join!(
2460            async move {
2461                let rx = Mutex::new(Some(rx));
2462                let block_server = BlockServer::new(
2463                    BLOCK_SIZE,
2464                    Arc::new(MockInterface {
2465                        read_hook: Some(Box::new(move |_, _, _, _| {
2466                            let rx = rx.lock().take().unwrap();
2467                            Box::pin(async {
2468                                let _ = rx.await;
2469                                Ok(())
2470                            })
2471                        })),
2472                        ..MockInterface::default()
2473                    }),
2474                );
2475                block_server.handle_requests(stream).await.unwrap();
2476            },
2477            async move {
2478                let (session_proxy, server) = fidl::endpoints::create_proxy();
2479
2480                proxy.open_session(server).unwrap();
2481
2482                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2483                let vmo_id = session_proxy
2484                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2485                    .await
2486                    .unwrap()
2487                    .unwrap();
2488
2489                let mut fifo =
2490                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2491                let (mut reader, mut writer) = fifo.async_io();
2492
2493                writer
2494                    .write_entries(&BlockFifoRequest {
2495                        command: BlockFifoCommand {
2496                            opcode: BlockOpcode::Read.into_primitive(),
2497                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2498                            ..Default::default()
2499                        },
2500                        reqid: 1,
2501                        group: 1,
2502                        vmoid: vmo_id.id,
2503                        length: 1,
2504                        ..Default::default()
2505                    })
2506                    .await
2507                    .unwrap();
2508
2509                writer
2510                    .write_entries(&BlockFifoRequest {
2511                        command: BlockFifoCommand {
2512                            opcode: BlockOpcode::Read.into_primitive(),
2513                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2514                            ..Default::default()
2515                        },
2516                        reqid: 2,
2517                        group: 1,
2518                        vmoid: vmo_id.id,
2519                        length: 1,
2520                        ..Default::default()
2521                    })
2522                    .await
2523                    .unwrap();
2524
2525                // Send an independent request to flush through the fifo.
2526                writer
2527                    .write_entries(&BlockFifoRequest {
2528                        command: BlockFifoCommand {
2529                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2530                            ..Default::default()
2531                        },
2532                        reqid: 3,
2533                        vmoid: vmo_id.id,
2534                        ..Default::default()
2535                    })
2536                    .await
2537                    .unwrap();
2538
2539                // It should succeed.
2540                let mut response = BlockFifoResponse::default();
2541                reader.read_entries(&mut response).await.unwrap();
2542                assert_eq!(response.status, zx::sys::ZX_OK);
2543                assert_eq!(response.reqid, 3);
2544
2545                // Now release the original request.
2546                tx.send(()).unwrap();
2547
2548                // The response should be for the first message tagged as last, and it should be
2549                // an error because we sent two messages with the LAST marker.
2550                let mut response = BlockFifoResponse::default();
2551                reader.read_entries(&mut response).await.unwrap();
2552                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2553                assert_eq!(response.reqid, 1);
2554                assert_eq!(response.group, 1);
2555            }
2556        );
2557    }
2558
2559    #[fuchsia::test(allow_stalls = false)]
2560    async fn test_requests_dont_block_sessions() {
2561        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2562
2563        let (tx, rx) = oneshot::channel();
2564
2565        fasync::Task::local(async move {
2566            let rx = Mutex::new(Some(rx));
2567            let block_server = BlockServer::new(
2568                BLOCK_SIZE,
2569                Arc::new(MockInterface {
2570                    read_hook: Some(Box::new(move |_, _, _, _| {
2571                        let rx = rx.lock().take().unwrap();
2572                        Box::pin(async {
2573                            let _ = rx.await;
2574                            Ok(())
2575                        })
2576                    })),
2577                    ..MockInterface::default()
2578                }),
2579            );
2580            block_server.handle_requests(stream).await.unwrap();
2581        })
2582        .detach();
2583
2584        let mut fut = pin!(async {
2585            let (session_proxy, server) = fidl::endpoints::create_proxy();
2586
2587            proxy.open_session(server).unwrap();
2588
2589            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2590            let vmo_id = session_proxy
2591                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2592                .await
2593                .unwrap()
2594                .unwrap();
2595
2596            let mut fifo =
2597                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2598            let (mut reader, mut writer) = fifo.async_io();
2599
2600            writer
2601                .write_entries(&BlockFifoRequest {
2602                    command: BlockFifoCommand {
2603                        opcode: BlockOpcode::Read.into_primitive(),
2604                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2605                        ..Default::default()
2606                    },
2607                    reqid: 1,
2608                    group: 1,
2609                    vmoid: vmo_id.id,
2610                    length: 1,
2611                    ..Default::default()
2612                })
2613                .await
2614                .unwrap();
2615
2616            let mut response = BlockFifoResponse::default();
2617            reader.read_entries(&mut response).await.unwrap();
2618            assert_eq!(response.status, zx::sys::ZX_OK);
2619        });
2620
2621        // The response won't come back until we send on `tx`.
2622        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2623
2624        let mut fut2 = pin!(proxy.get_volume_info());
2625
2626        // get_volume_info is set up to stall forever.
2627        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2628
2629        // If we now free up the first future, it should resolve; the stalled call to
2630        // get_volume_info should not block the fifo response.
2631        let _ = tx.send(());
2632
2633        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2634    }
2635
2636    #[fuchsia::test]
2637    async fn test_request_flow_control() {
2638        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2639
2640        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2641        // server will block until that happens.
2642        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2643        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2644        let event_clone = event.clone();
2645        futures::join!(
2646            async move {
2647                let block_server = BlockServer::new(
2648                    BLOCK_SIZE,
2649                    Arc::new(MockInterface {
2650                        read_hook: Some(Box::new(move |_, _, _, _| {
2651                            let event_clone = event_clone.clone();
2652                            Box::pin(async move {
2653                                if !event_clone.1.load(Ordering::SeqCst) {
2654                                    event_clone.0.listen().await;
2655                                }
2656                                Ok(())
2657                            })
2658                        })),
2659                        ..MockInterface::default()
2660                    }),
2661                );
2662                block_server.handle_requests(stream).await.unwrap();
2663            },
2664            async move {
2665                let (session_proxy, server) = fidl::endpoints::create_proxy();
2666
2667                proxy.open_session(server).unwrap();
2668
2669                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2670                let vmo_id = session_proxy
2671                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2672                    .await
2673                    .unwrap()
2674                    .unwrap();
2675
2676                let mut fifo =
2677                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2678                let (mut reader, mut writer) = fifo.async_io();
2679
2680                for i in 0..MAX_REQUESTS {
2681                    writer
2682                        .write_entries(&BlockFifoRequest {
2683                            command: BlockFifoCommand {
2684                                opcode: BlockOpcode::Read.into_primitive(),
2685                                ..Default::default()
2686                            },
2687                            reqid: (i + 1) as u32,
2688                            dev_offset: i,
2689                            vmoid: vmo_id.id,
2690                            length: 1,
2691                            ..Default::default()
2692                        })
2693                        .await
2694                        .unwrap();
2695                }
2696                assert!(
2697                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2698                        command: BlockFifoCommand {
2699                            opcode: BlockOpcode::Read.into_primitive(),
2700                            ..Default::default()
2701                        },
2702                        reqid: u32::MAX,
2703                        dev_offset: MAX_REQUESTS,
2704                        vmoid: vmo_id.id,
2705                        length: 1,
2706                        ..Default::default()
2707                    })))
2708                    .is_pending()
2709                );
2710                // OK, let the server start to process.
2711                event.1.store(true, Ordering::SeqCst);
2712                event.0.notify(usize::MAX);
2713                // For each entry we read, make sure we can write a new one in.
2714                let mut finished_reqids = vec![];
2715                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2716                    let mut response = BlockFifoResponse::default();
2717                    reader.read_entries(&mut response).await.unwrap();
2718                    assert_eq!(response.status, zx::sys::ZX_OK);
2719                    finished_reqids.push(response.reqid);
2720                    writer
2721                        .write_entries(&BlockFifoRequest {
2722                            command: BlockFifoCommand {
2723                                opcode: BlockOpcode::Read.into_primitive(),
2724                                ..Default::default()
2725                            },
2726                            reqid: (i + 1) as u32,
2727                            dev_offset: i,
2728                            vmoid: vmo_id.id,
2729                            length: 1,
2730                            ..Default::default()
2731                        })
2732                        .await
2733                        .unwrap();
2734                }
2735                let mut response = BlockFifoResponse::default();
2736                for _ in 0..MAX_REQUESTS {
2737                    reader.read_entries(&mut response).await.unwrap();
2738                    assert_eq!(response.status, zx::sys::ZX_OK);
2739                    finished_reqids.push(response.reqid);
2740                }
2741                // Verify that we got a response for each request.  Note that we can't assume FIFO
2742                // ordering.
2743                finished_reqids.sort();
2744                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2745                let mut i = 1;
2746                for reqid in finished_reqids {
2747                    assert_eq!(reqid, i);
2748                    i += 1;
2749                }
2750            }
2751        );
2752    }
2753
2754    #[fuchsia::test]
2755    async fn test_passthrough_io_with_fixed_map() {
2756        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2757
2758        let expected_op = Arc::new(Mutex::new(None));
2759        let expected_op_clone = expected_op.clone();
2760        futures::join!(
2761            async {
2762                let block_server = BlockServer::new(
2763                    BLOCK_SIZE,
2764                    Arc::new(IoMockInterface {
2765                        return_errors: false,
2766                        do_checks: true,
2767                        expected_op: expected_op_clone,
2768                    }),
2769                );
2770                block_server.handle_requests(stream).await.unwrap();
2771            },
2772            async move {
2773                let (session_proxy, server) = fidl::endpoints::create_proxy();
2774
2775                let mapping = fblock::BlockOffsetMapping {
2776                    source_block_offset: 0,
2777                    target_block_offset: 10,
2778                    length: 20,
2779                };
2780                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2781
2782                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2783                let vmo_id = session_proxy
2784                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2785                    .await
2786                    .unwrap()
2787                    .unwrap();
2788
2789                let mut fifo =
2790                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2791                let (mut reader, mut writer) = fifo.async_io();
2792
2793                // READ
2794                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2795                writer
2796                    .write_entries(&BlockFifoRequest {
2797                        command: BlockFifoCommand {
2798                            opcode: BlockOpcode::Read.into_primitive(),
2799                            ..Default::default()
2800                        },
2801                        vmoid: vmo_id.id,
2802                        dev_offset: 1,
2803                        length: 2,
2804                        vmo_offset: 3,
2805                        ..Default::default()
2806                    })
2807                    .await
2808                    .unwrap();
2809
2810                let mut response = BlockFifoResponse::default();
2811                reader.read_entries(&mut response).await.unwrap();
2812                assert_eq!(response.status, zx::sys::ZX_OK);
2813
2814                // WRITE
2815                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2816                writer
2817                    .write_entries(&BlockFifoRequest {
2818                        command: BlockFifoCommand {
2819                            opcode: BlockOpcode::Write.into_primitive(),
2820                            ..Default::default()
2821                        },
2822                        vmoid: vmo_id.id,
2823                        dev_offset: 4,
2824                        length: 5,
2825                        vmo_offset: 6,
2826                        ..Default::default()
2827                    })
2828                    .await
2829                    .unwrap();
2830
2831                reader.read_entries(&mut response).await.unwrap();
2832                assert_eq!(response.status, zx::sys::ZX_OK);
2833
2834                // FLUSH
2835                *expected_op.lock() = Some(ExpectedOp::Flush);
2836                writer
2837                    .write_entries(&BlockFifoRequest {
2838                        command: BlockFifoCommand {
2839                            opcode: BlockOpcode::Flush.into_primitive(),
2840                            ..Default::default()
2841                        },
2842                        ..Default::default()
2843                    })
2844                    .await
2845                    .unwrap();
2846
2847                reader.read_entries(&mut response).await.unwrap();
2848                assert_eq!(response.status, zx::sys::ZX_OK);
2849
2850                // TRIM
2851                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2852                writer
2853                    .write_entries(&BlockFifoRequest {
2854                        command: BlockFifoCommand {
2855                            opcode: BlockOpcode::Trim.into_primitive(),
2856                            ..Default::default()
2857                        },
2858                        dev_offset: 7,
2859                        length: 3,
2860                        ..Default::default()
2861                    })
2862                    .await
2863                    .unwrap();
2864
2865                reader.read_entries(&mut response).await.unwrap();
2866                assert_eq!(response.status, zx::sys::ZX_OK);
2867
2868                // READ past window
2869                *expected_op.lock() = None;
2870                writer
2871                    .write_entries(&BlockFifoRequest {
2872                        command: BlockFifoCommand {
2873                            opcode: BlockOpcode::Read.into_primitive(),
2874                            ..Default::default()
2875                        },
2876                        vmoid: vmo_id.id,
2877                        dev_offset: 19,
2878                        length: 2,
2879                        vmo_offset: 3,
2880                        ..Default::default()
2881                    })
2882                    .await
2883                    .unwrap();
2884
2885                reader.read_entries(&mut response).await.unwrap();
2886                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2887
2888                std::mem::drop(proxy);
2889            }
2890        );
2891    }
2892
2893    #[fuchsia::test]
2894    fn operation_map() {
2895        fn expect_map_result(
2896            mut operation: Operation,
2897            mapping: Option<BlockOffsetMapping>,
2898            max_blocks: Option<NonZero<u32>>,
2899            expected_operations: Vec<Operation>,
2900        ) {
2901            let mut ops = vec![];
2902            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2903                ops.push(operation);
2904                operation = remainder;
2905            }
2906            ops.push(operation);
2907            assert_eq!(ops, expected_operations);
2908        }
2909
2910        // No limits
2911        expect_map_result(
2912            Operation::Read {
2913                device_block_offset: 10,
2914                block_count: 200,
2915                _unused: 0,
2916                vmo_offset: 0,
2917                options: ReadOptions::default(),
2918            },
2919            None,
2920            None,
2921            vec![Operation::Read {
2922                device_block_offset: 10,
2923                block_count: 200,
2924                _unused: 0,
2925                vmo_offset: 0,
2926                options: ReadOptions::default(),
2927            }],
2928        );
2929
2930        // Max block count
2931        expect_map_result(
2932            Operation::Read {
2933                device_block_offset: 10,
2934                block_count: 200,
2935                _unused: 0,
2936                vmo_offset: 0,
2937                options: ReadOptions::default(),
2938            },
2939            None,
2940            NonZero::new(120),
2941            vec![
2942                Operation::Read {
2943                    device_block_offset: 10,
2944                    block_count: 120,
2945                    _unused: 0,
2946                    vmo_offset: 0,
2947                    options: ReadOptions::default(),
2948                },
2949                Operation::Read {
2950                    device_block_offset: 130,
2951                    block_count: 80,
2952                    _unused: 0,
2953                    vmo_offset: 120 * 512,
2954                    options: ReadOptions::default(),
2955                },
2956            ],
2957        );
2958        expect_map_result(
2959            Operation::Trim { device_block_offset: 10, block_count: 200 },
2960            None,
2961            NonZero::new(120),
2962            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2963        );
2964
2965        // Remapping + Max block count
2966        expect_map_result(
2967            Operation::Read {
2968                device_block_offset: 10,
2969                block_count: 200,
2970                _unused: 0,
2971                vmo_offset: 0,
2972                options: ReadOptions::default(),
2973            },
2974            Some(BlockOffsetMapping {
2975                source_block_offset: 10,
2976                target_block_offset: 100,
2977                length: 200,
2978            }),
2979            NonZero::new(120),
2980            vec![
2981                Operation::Read {
2982                    device_block_offset: 100,
2983                    block_count: 120,
2984                    _unused: 0,
2985                    vmo_offset: 0,
2986                    options: ReadOptions::default(),
2987                },
2988                Operation::Read {
2989                    device_block_offset: 220,
2990                    block_count: 80,
2991                    _unused: 0,
2992                    vmo_offset: 120 * 512,
2993                    options: ReadOptions::default(),
2994                },
2995            ],
2996        );
2997        expect_map_result(
2998            Operation::Trim { device_block_offset: 10, block_count: 200 },
2999            Some(BlockOffsetMapping {
3000                source_block_offset: 10,
3001                target_block_offset: 100,
3002                length: 200,
3003            }),
3004            NonZero::new(120),
3005            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3006        );
3007    }
3008
3009    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3010    #[fuchsia::test]
3011    async fn test_pre_barrier_flush_failure() {
3012        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3013
3014        struct NoBarrierInterface;
3015        impl super::async_interface::Interface for NoBarrierInterface {
3016            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3017                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3018                    device_flags: fblock::Flag::empty(), // No BARRIER_SUPPORT
3019                    max_transfer_blocks: NonZero::new(100),
3020                    block_range: Some(0..100),
3021                    type_guid: [0; 16],
3022                    instance_guid: [0; 16],
3023                    name: "test".to_string(),
3024                    flags: 0,
3025                }))
3026            }
3027            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3028                Ok(())
3029            }
3030            async fn read(
3031                &self,
3032                _: u64,
3033                _: u32,
3034                _: &Arc<zx::Vmo>,
3035                _: u64,
3036                _: ReadOptions,
3037                _: TraceFlowId,
3038            ) -> Result<(), zx::Status> {
3039                unreachable!()
3040            }
3041            async fn write(
3042                &self,
3043                _: u64,
3044                _: u32,
3045                _: &Arc<zx::Vmo>,
3046                _: u64,
3047                _: WriteOptions,
3048                _: TraceFlowId,
3049            ) -> Result<(), zx::Status> {
3050                panic!("Write should not be called");
3051            }
3052            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3053                Err(zx::Status::IO)
3054            }
3055            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3056                unreachable!()
3057            }
3058        }
3059
3060        futures::join!(
3061            async move {
3062                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3063                block_server.handle_requests(stream).await.unwrap();
3064            },
3065            async move {
3066                let (session_proxy, server) = fidl::endpoints::create_proxy();
3067                proxy.open_session(server).unwrap();
3068                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3069                let vmo_id = session_proxy
3070                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3071                    .await
3072                    .unwrap()
3073                    .unwrap();
3074
3075                let mut fifo =
3076                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3077                let (mut reader, mut writer) = fifo.async_io();
3078
3079                writer
3080                    .write_entries(&BlockFifoRequest {
3081                        command: BlockFifoCommand {
3082                            opcode: BlockOpcode::Write.into_primitive(),
3083                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3084                            ..Default::default()
3085                        },
3086                        vmoid: vmo_id.id,
3087                        length: 1,
3088                        ..Default::default()
3089                    })
3090                    .await
3091                    .unwrap();
3092
3093                let mut response = BlockFifoResponse::default();
3094                reader.read_entries(&mut response).await.unwrap();
3095                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3096            }
3097        );
3098    }
3099
3100    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3101    // post-flush is not executed.
3102    #[fuchsia::test]
3103    async fn test_post_barrier_write_failure() {
3104        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
3105
3106        struct NoBarrierInterface;
3107        impl super::async_interface::Interface for NoBarrierInterface {
3108            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3109                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3110                    device_flags: fblock::Flag::empty(), // No FUA_SUPPORT
3111                    max_transfer_blocks: NonZero::new(100),
3112                    block_range: Some(0..100),
3113                    type_guid: [0; 16],
3114                    instance_guid: [0; 16],
3115                    name: "test".to_string(),
3116                    flags: 0,
3117                }))
3118            }
3119            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3120                Ok(())
3121            }
3122            async fn read(
3123                &self,
3124                _: u64,
3125                _: u32,
3126                _: &Arc<zx::Vmo>,
3127                _: u64,
3128                _: ReadOptions,
3129                _: TraceFlowId,
3130            ) -> Result<(), zx::Status> {
3131                unreachable!()
3132            }
3133            async fn write(
3134                &self,
3135                _: u64,
3136                _: u32,
3137                _: &Arc<zx::Vmo>,
3138                _: u64,
3139                _: WriteOptions,
3140                _: TraceFlowId,
3141            ) -> Result<(), zx::Status> {
3142                Err(zx::Status::IO)
3143            }
3144            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3145                panic!("Flush should not be called")
3146            }
3147            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3148                unreachable!()
3149            }
3150        }
3151
3152        futures::join!(
3153            async move {
3154                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3155                block_server.handle_requests(stream).await.unwrap();
3156            },
3157            async move {
3158                let (session_proxy, server) = fidl::endpoints::create_proxy();
3159                proxy.open_session(server).unwrap();
3160                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3161                let vmo_id = session_proxy
3162                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3163                    .await
3164                    .unwrap()
3165                    .unwrap();
3166
3167                let mut fifo =
3168                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3169                let (mut reader, mut writer) = fifo.async_io();
3170
3171                writer
3172                    .write_entries(&BlockFifoRequest {
3173                        command: BlockFifoCommand {
3174                            opcode: BlockOpcode::Write.into_primitive(),
3175                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3176                            ..Default::default()
3177                        },
3178                        vmoid: vmo_id.id,
3179                        length: 1,
3180                        ..Default::default()
3181                    })
3182                    .await
3183                    .unwrap();
3184
3185                let mut response = BlockFifoResponse::default();
3186                reader.read_entries(&mut response).await.unwrap();
3187                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3188            }
3189        );
3190    }
3191}