Skip to main content

block_server/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{Error, anyhow};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fblock::{BlockIoFlag, BlockOpcode, MAX_TRANSFER_UNBOUNDED};
7use fuchsia_async::epoch::{Epoch, EpochGuard};
8use fuchsia_sync::{MappedMutexGuard, Mutex, MutexGuard};
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use slab::Slab;
11use std::borrow::{Borrow, Cow};
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::Arc;
16use std::sync::atomic::AtomicU64;
17use storage_device::buffer::Buffer;
18use zx::HandleBased;
19use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
20
21pub mod async_interface;
22pub mod c_interface;
23pub mod callback_interface;
24
25#[cfg(test)]
26mod decompression_tests;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn label(&self) -> &str {
40        match self {
41            Self::Block(BlockInfo { .. }) => "",
42            Self::Partition(PartitionInfo { name, .. }) => name,
43        }
44    }
45    pub fn device_flags(&self) -> fblock::DeviceFlag {
46        match self {
47            Self::Block(BlockInfo { device_flags, .. }) => *device_flags,
48            Self::Partition(PartitionInfo { device_flags, .. }) => *device_flags,
49        }
50    }
51
52    pub fn block_count(&self) -> Option<u64> {
53        match self {
54            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
55            Self::Partition(PartitionInfo { block_range, .. }) => {
56                block_range.as_ref().map(|range| range.end - range.start)
57            }
58        }
59    }
60
61    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
62        match self {
63            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
64            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
65                max_transfer_blocks.clone()
66            }
67        }
68    }
69
70    fn max_transfer_size(&self, block_size: u32) -> u32 {
71        if let Some(max_blocks) = self.max_transfer_blocks() {
72            max_blocks.get() * block_size
73        } else {
74            MAX_TRANSFER_UNBOUNDED
75        }
76    }
77}
78
79/// Information associated with non-partition block devices.
80#[derive(Clone, Default)]
81pub struct BlockInfo {
82    pub device_flags: fblock::DeviceFlag,
83    pub block_count: u64,
84    pub max_transfer_blocks: Option<NonZero<u32>>,
85}
86
87/// Information associated with a block device that is also a partition.
88#[derive(Clone, Default)]
89pub struct PartitionInfo {
90    /// The device flags reported by the underlying device.
91    pub device_flags: fblock::DeviceFlag,
92    pub max_transfer_blocks: Option<NonZero<u32>>,
93    /// If `block_range` is None, the partition is a volume and may not be contiguous.
94    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
95    /// slices and use that (along with the slice and block sizes) to determine the block count.
96    pub block_range: Option<Range<u64>>,
97    pub type_guid: [u8; 16],
98    pub instance_guid: [u8; 16],
99    pub name: String,
100    pub flags: u64,
101}
102
103/// We internally keep track of active requests, so that when the server is torn down, we can
104/// deallocate all of the resources for pending requests.
105struct ActiveRequest<S> {
106    session: S,
107    group_or_request: GroupOrRequest,
108    trace_flow_id: TraceFlowId,
109    _epoch_guard: EpochGuard<'static>,
110    status: zx::Status,
111    count: u32,
112    req_id: Option<u32>,
113    decompression_info: Option<DecompressionInfo>,
114}
115
116struct DecompressionInfo {
117    // This is the range of compressed bytes in receiving buffer.
118    compressed_range: Range<usize>,
119
120    // This is the range in the target VMO where we will write uncompressed bytes.
121    uncompressed_range: Range<u64>,
122
123    bytes_so_far: u64,
124    mapping: Arc<VmoMapping>,
125    buffer: Option<Buffer<'static>>,
126}
127
128impl DecompressionInfo {
129    /// Returns the uncompressed slice.
130    fn uncompressed_slice(&self) -> *mut [u8] {
131        std::ptr::slice_from_raw_parts_mut(
132            (self.mapping.base + self.uncompressed_range.start as usize) as *mut u8,
133            (self.uncompressed_range.end - self.uncompressed_range.start) as usize,
134        )
135    }
136}
137
138pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
139
140impl<S> Default for ActiveRequests<S> {
141    fn default() -> Self {
142        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default() }))
143    }
144}
145
146impl<S> ActiveRequests<S> {
147    fn complete_and_take_response(
148        &self,
149        request_id: RequestId,
150        status: zx::Status,
151    ) -> Option<(S, BlockFifoResponse)> {
152        self.0.lock().complete_and_take_response(request_id, status)
153    }
154
155    fn request(&self, request_id: RequestId) -> MappedMutexGuard<'_, ActiveRequest<S>> {
156        MutexGuard::map(self.0.lock(), |i| &mut i.requests[request_id.0])
157    }
158}
159
160struct ActiveRequestsInner<S> {
161    requests: Slab<ActiveRequest<S>>,
162}
163
164// Keeps track of all the requests that are currently being processed
165impl<S> ActiveRequestsInner<S> {
166    /// Completes a request.
167    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
168        let group = &mut self.requests[request_id.0];
169
170        group.count = group.count.checked_sub(1).unwrap();
171        if status != zx::Status::OK && group.status == zx::Status::OK {
172            group.status = status
173        }
174
175        fuchsia_trace::duration!(
176            "storage",
177            "block_server::finish_transaction",
178            "request_id" => request_id.0,
179            "group_completed" => group.count == 0,
180            "status" => status.into_raw());
181        if let Some(trace_flow_id) = group.trace_flow_id {
182            fuchsia_trace::flow_step!(
183                "storage",
184                "block_server::finish_request",
185                trace_flow_id.get().into()
186            );
187        }
188
189        if group.count == 0
190            && group.status == zx::Status::OK
191            && let Some(info) = &mut group.decompression_info
192        {
193            thread_local! {
194                static DECOMPRESSOR: std::cell::RefCell<zstd::bulk::Decompressor<'static>> =
195                    std::cell::RefCell::new(zstd::bulk::Decompressor::new().unwrap());
196            }
197            DECOMPRESSOR.with(|decompressor| {
198                // SAFETY: We verified `uncompressed_range` fits within our mapping.
199                let target_slice = unsafe { info.uncompressed_slice().as_mut().unwrap() };
200                let mut decompressor = decompressor.borrow_mut();
201                if let Err(error) = decompressor.decompress_to_buffer(
202                    &info.buffer.take().unwrap().as_slice()[info.compressed_range.clone()],
203                    target_slice,
204                ) {
205                    log::warn!(error:?; "Decompression error");
206                    group.status = zx::Status::IO_DATA_INTEGRITY;
207                };
208            });
209        }
210    }
211
212    /// Takes the response if all requests are finished.
213    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
214        let group = &self.requests[request_id.0];
215        match group.req_id {
216            Some(reqid) if group.count == 0 => {
217                let group = self.requests.remove(request_id.0);
218                Some((
219                    group.session,
220                    BlockFifoResponse {
221                        status: group.status.into_raw(),
222                        reqid,
223                        group: group.group_or_request.group_id().unwrap_or(0),
224                        ..Default::default()
225                    },
226                ))
227            }
228            _ => None,
229        }
230    }
231
232    /// Competes the request and returns a response if the request group is finished.
233    fn complete_and_take_response(
234        &mut self,
235        request_id: RequestId,
236        status: zx::Status,
237    ) -> Option<(S, BlockFifoResponse)> {
238        self.complete(request_id, status);
239        self.take_response(request_id)
240    }
241}
242
243/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
244/// cbindgen:no-export
245pub struct BlockServer<SM: SessionManager> {
246    block_size: u32,
247    orchestrator: Arc<SM::Orchestrator>,
248}
249
250/// A single entry in `[OffsetMap]`.
251#[derive(Debug)]
252pub struct BlockOffsetMapping {
253    source_block_offset: u64,
254    target_block_offset: u64,
255    length: u64,
256}
257
258impl BlockOffsetMapping {
259    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
260        blocks.0 >= self.source_block_offset
261            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
262    }
263}
264
265impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
266    type Error = zx::Status;
267
268    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
269        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
270        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
271        Ok(Self {
272            source_block_offset: wire.source_block_offset,
273            target_block_offset: wire.target_block_offset,
274            length: wire.length,
275        })
276    }
277}
278
279/// Remaps the offset of block requests based on an internal map, and truncates long requests.
280pub struct OffsetMap {
281    mapping: Option<BlockOffsetMapping>,
282    max_transfer_blocks: Option<NonZero<u32>>,
283}
284
285impl OffsetMap {
286    /// An OffsetMap that remaps requests.
287    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
288        Self { mapping: Some(mapping), max_transfer_blocks }
289    }
290
291    /// An OffsetMap that just enforces maximum request sizes.
292    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
293        Self { mapping: None, max_transfer_blocks }
294    }
295
296    pub fn is_empty(&self) -> bool {
297        self.mapping.is_none()
298    }
299
300    fn mapping(&self) -> Option<&BlockOffsetMapping> {
301        self.mapping.as_ref()
302    }
303
304    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
305        self.max_transfer_blocks
306    }
307}
308
309// Methods take Arc<Self> rather than &self because of
310// https://github.com/rust-lang/rust/issues/42940.
311pub trait SessionManager: 'static {
312    /// The Orchestrator is an object that holds the `SessionManager` and any other state that needs
313    /// to be shared between sessions.  It is responsible for keeping the `SessionManager` alive.
314    /// We use this type instead of directly holding an Arc<SessionManager> in BlockServer, to avoid
315    /// nested Arcs in concrete implementations which need to keep additional state.
316    type Orchestrator: Borrow<Self> + Send + Sync;
317
318    const SUPPORTS_DECOMPRESSION: bool;
319
320    type Session;
321
322    fn on_attach_vmo(
323        orchestrator: Arc<Self::Orchestrator>,
324        vmo: &Arc<zx::Vmo>,
325    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
326
327    /// Creates a new session to handle `stream`.
328    /// The returned future should run until the session completes, for example when the client end
329    /// closes.
330    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
331    fn open_session(
332        orchestrator: Arc<Self::Orchestrator>,
333        stream: fblock::SessionRequestStream,
334        offset_map: OffsetMap,
335        block_size: u32,
336    ) -> impl Future<Output = Result<(), Error>> + Send;
337
338    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
339    fn get_info(&self) -> Cow<'_, DeviceInfo>;
340
341    /// Called to handle the GetVolumeInfo FIDL call.
342    fn get_volume_info(
343        &self,
344    ) -> impl Future<Output = Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status>> + Send
345    {
346        async { Err(zx::Status::NOT_SUPPORTED) }
347    }
348
349    /// Called to handle the QuerySlices FIDL call.
350    fn query_slices(
351        &self,
352        _start_slices: &[u64],
353    ) -> impl Future<Output = Result<Vec<fblock::VsliceRange>, zx::Status>> + Send {
354        async { Err(zx::Status::NOT_SUPPORTED) }
355    }
356
357    /// Called to handle the Shrink FIDL call.
358    fn extend(
359        &self,
360        _start_slice: u64,
361        _slice_count: u64,
362    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
363        async { Err(zx::Status::NOT_SUPPORTED) }
364    }
365
366    /// Called to handle the Shrink FIDL call.
367    fn shrink(
368        &self,
369        _start_slice: u64,
370        _slice_count: u64,
371    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
372        async { Err(zx::Status::NOT_SUPPORTED) }
373    }
374
375    /// Returns the active requests.
376    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
377}
378
379/// A helper trait for converting various types into an `Orchestrator`.
380///
381/// This exists to simplify [`BlockServer::new`].
382pub trait IntoOrchestrator {
383    type SM: SessionManager;
384
385    fn into_orchestrator(self) -> Arc<<Self::SM as SessionManager>::Orchestrator>;
386}
387
388impl<SM: SessionManager> BlockServer<SM> {
389    pub fn new(block_size: u32, orchestrator: impl IntoOrchestrator<SM = SM>) -> Self {
390        Self { block_size, orchestrator: orchestrator.into_orchestrator() }
391    }
392
393    pub fn session_manager(&self) -> &SM {
394        self.orchestrator.as_ref().borrow()
395    }
396
397    /// Called to process requests for fuchsia.storage.block.Block.
398    pub async fn handle_requests(
399        &self,
400        mut requests: fblock::BlockRequestStream,
401    ) -> Result<(), Error> {
402        let scope = fasync::Scope::new();
403        loop {
404            match requests.try_next().await {
405                Ok(Some(request)) => {
406                    if let Some(session) = self.handle_request(request).await? {
407                        scope.spawn(session.map(|_| ()));
408                    }
409                }
410                Ok(None) => break,
411                Err(err) => log::warn!(err:?; "Invalid request"),
412            }
413        }
414        scope.await;
415        Ok(())
416    }
417
418    /// Processes a Block request.  If a new session task is created in response to the request,
419    /// it is returned.
420    async fn handle_request(
421        &self,
422        request: fblock::BlockRequest,
423    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send + use<SM>>, Error> {
424        match request {
425            fblock::BlockRequest::GetInfo { responder } => {
426                let info = self.device_info();
427                let max_transfer_size = info.max_transfer_size(self.block_size);
428                let (block_count, mut flags) = match info.as_ref() {
429                    DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
430                        (*block_count, *device_flags)
431                    }
432                    DeviceInfo::Partition(partition_info) => {
433                        let block_count = if let Some(range) = partition_info.block_range.as_ref() {
434                            range.end - range.start
435                        } else {
436                            let volume_info = self.session_manager().get_volume_info().await?;
437                            volume_info.0.slice_size * volume_info.1.partition_slice_count
438                                / self.block_size as u64
439                        };
440                        (block_count, partition_info.device_flags)
441                    }
442                };
443                if SM::SUPPORTS_DECOMPRESSION {
444                    flags |= fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT;
445                }
446                responder.send(Ok(&fblock::BlockInfo {
447                    block_count,
448                    block_size: self.block_size,
449                    max_transfer_size,
450                    flags,
451                }))?;
452            }
453            fblock::BlockRequest::OpenSession { session, control_handle: _ } => {
454                let info = self.device_info();
455                return Ok(Some(SM::open_session(
456                    self.orchestrator.clone(),
457                    session.into_stream(),
458                    OffsetMap::empty(info.max_transfer_blocks()),
459                    self.block_size,
460                )));
461            }
462            fblock::BlockRequest::OpenSessionWithOffsetMap {
463                session,
464                mapping,
465                control_handle: _,
466            } => {
467                let info = self.device_info();
468                let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
469                    Ok(m) => m,
470                    Err(status) => {
471                        session.close_with_epitaph(status)?;
472                        return Ok(None);
473                    }
474                };
475                if let Some(max) = info.block_count() {
476                    if initial_mapping.target_block_offset + initial_mapping.length > max {
477                        log::warn!("Invalid mapping for session: {initial_mapping:?} (max {max})");
478                        session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
479                        return Ok(None);
480                    }
481                }
482                return Ok(Some(SM::open_session(
483                    self.orchestrator.clone(),
484                    session.into_stream(),
485                    OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
486                    self.block_size,
487                )));
488            }
489            fblock::BlockRequest::GetTypeGuid { responder } => {
490                let info = self.device_info();
491                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
492                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
493                    guid.value.copy_from_slice(&partition_info.type_guid);
494                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
495                } else {
496                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
497                }
498            }
499            fblock::BlockRequest::GetInstanceGuid { responder } => {
500                let info = self.device_info();
501                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
502                    let mut guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
503                    guid.value.copy_from_slice(&partition_info.instance_guid);
504                    responder.send(zx::sys::ZX_OK, Some(&guid))?;
505                } else {
506                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
507                }
508            }
509            fblock::BlockRequest::GetName { responder } => {
510                let info = self.device_info();
511                if let DeviceInfo::Partition(partition_info) = info.as_ref() {
512                    responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
513                } else {
514                    responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
515                }
516            }
517            fblock::BlockRequest::GetMetadata { responder } => {
518                let info = self.device_info();
519                if let DeviceInfo::Partition(info) = info.as_ref() {
520                    let mut type_guid = fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
521                    type_guid.value.copy_from_slice(&info.type_guid);
522                    let mut instance_guid =
523                        fblock::Guid { value: [0u8; fblock::GUID_LENGTH as usize] };
524                    instance_guid.value.copy_from_slice(&info.instance_guid);
525                    responder.send(Ok(&fblock::BlockGetMetadataResponse {
526                        name: Some(info.name.clone()),
527                        type_guid: Some(type_guid),
528                        instance_guid: Some(instance_guid),
529                        start_block_offset: info.block_range.as_ref().map(|range| range.start),
530                        num_blocks: info.block_range.as_ref().map(|range| range.end - range.start),
531                        flags: Some(info.flags),
532                        ..Default::default()
533                    }))?;
534                } else {
535                    responder.send(Err(zx::sys::ZX_ERR_NOT_SUPPORTED))?;
536                }
537            }
538            fblock::BlockRequest::QuerySlices { responder, start_slices } => {
539                match self.session_manager().query_slices(&start_slices).await {
540                    Ok(mut results) => {
541                        let results_len = results.len();
542                        assert!(results_len <= 16);
543                        results.resize(16, fblock::VsliceRange { allocated: false, count: 0 });
544                        responder.send(
545                            zx::sys::ZX_OK,
546                            &results.try_into().unwrap(),
547                            results_len as u64,
548                        )?;
549                    }
550                    Err(s) => {
551                        responder.send(
552                            s.into_raw(),
553                            &[fblock::VsliceRange { allocated: false, count: 0 }; 16],
554                            0,
555                        )?;
556                    }
557                }
558            }
559            fblock::BlockRequest::GetVolumeInfo { responder, .. } => {
560                match self.session_manager().get_volume_info().await {
561                    Ok((manager_info, volume_info)) => {
562                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
563                    }
564                    Err(s) => responder.send(s.into_raw(), None, None)?,
565                }
566            }
567            fblock::BlockRequest::Extend { responder, start_slice, slice_count } => {
568                responder.send(
569                    zx::Status::from(self.session_manager().extend(start_slice, slice_count).await)
570                        .into_raw(),
571                )?;
572            }
573            fblock::BlockRequest::Shrink { responder, start_slice, slice_count } => {
574                responder.send(
575                    zx::Status::from(self.session_manager().shrink(start_slice, slice_count).await)
576                        .into_raw(),
577                )?;
578            }
579            fblock::BlockRequest::Destroy { responder, .. } => {
580                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
581            }
582        }
583        Ok(None)
584    }
585
586    fn device_info(&self) -> Cow<'_, DeviceInfo> {
587        self.session_manager().get_info()
588    }
589}
590
591struct SessionHelper<SM: SessionManager> {
592    orchestrator: Arc<SM::Orchestrator>,
593    offset_map: OffsetMap,
594    block_size: u32,
595    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
596    vmos: Mutex<BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)>>,
597}
598
599struct VmoMapping {
600    base: usize,
601    size: usize,
602}
603
604impl VmoMapping {
605    fn new(vmo: &zx::Vmo) -> Result<Arc<Self>, zx::Status> {
606        let size = vmo.get_size().unwrap() as usize;
607        Ok(Arc::new(Self {
608            base: fuchsia_runtime::vmar_root_self()
609                .map(0, vmo, 0, size, zx::VmarFlags::PERM_WRITE | zx::VmarFlags::PERM_READ)
610                .inspect_err(|error| {
611                    log::warn!(error:?, size; "VmoMapping: unable to map VMO");
612                })?,
613            size,
614        }))
615    }
616}
617
618impl Drop for VmoMapping {
619    fn drop(&mut self) {
620        // SAFETY: We mapped this in `VmoMapping::new`.
621        unsafe {
622            let _ = fuchsia_runtime::vmar_root_self().unmap(self.base, self.size);
623        }
624    }
625}
626
627impl<SM: SessionManager> SessionHelper<SM> {
628    fn new(
629        orchestrator: Arc<SM::Orchestrator>,
630        offset_map: OffsetMap,
631        block_size: u32,
632    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
633        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
634        Ok((Self { orchestrator, offset_map, block_size, peer_fifo, vmos: Mutex::default() }, fifo))
635    }
636
637    fn session_manager(&self) -> &SM {
638        self.orchestrator.as_ref().borrow()
639    }
640
641    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
642        match request {
643            fblock::SessionRequest::GetFifo { responder } => {
644                let rights = zx::Rights::TRANSFER
645                    | zx::Rights::READ
646                    | zx::Rights::WRITE
647                    | zx::Rights::SIGNAL
648                    | zx::Rights::WAIT;
649                match self.peer_fifo.duplicate_handle(rights) {
650                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
651                    Err(s) => responder.send(Err(s.into_raw()))?,
652                }
653                Ok(())
654            }
655            fblock::SessionRequest::AttachVmo { vmo, responder } => {
656                let vmo = Arc::new(vmo);
657                let vmo_id = {
658                    let mut vmos = self.vmos.lock();
659                    if vmos.len() == u16::MAX as usize {
660                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
661                        return Ok(());
662                    } else {
663                        let vmo_id = match vmos.last_entry() {
664                            None => 1,
665                            Some(o) => {
666                                o.key().checked_add(1).unwrap_or_else(|| {
667                                    let mut vmo_id = 1;
668                                    // Find the first gap...
669                                    for (&id, _) in &*vmos {
670                                        if id > vmo_id {
671                                            break;
672                                        }
673                                        vmo_id = id + 1;
674                                    }
675                                    vmo_id
676                                })
677                            }
678                        };
679                        vmos.insert(vmo_id, (vmo.clone(), None));
680                        vmo_id
681                    }
682                };
683                SM::on_attach_vmo(self.orchestrator.clone(), &vmo).await?;
684                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
685                Ok(())
686            }
687            fblock::SessionRequest::Close { responder } => {
688                responder.send(Ok(()))?;
689                Err(anyhow!("Closed"))
690            }
691        }
692    }
693
694    /// Decodes `request`.
695    fn decode_fifo_request(
696        &self,
697        session: SM::Session,
698        request: &BlockFifoRequest,
699    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
700        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
701
702        let request_bytes = request.length as u64 * self.block_size as u64;
703
704        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
705            .ok_or(zx::Status::INVALID_ARGS)
706            .and_then(|code| {
707                if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
708                    if code != BlockOpcode::Read {
709                        return Err(zx::Status::INVALID_ARGS);
710                    }
711                    if !SM::SUPPORTS_DECOMPRESSION {
712                        return Err(zx::Status::NOT_SUPPORTED);
713                    }
714                }
715                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
716                    if request.length == 0 {
717                        return Err(zx::Status::INVALID_ARGS);
718                    }
719                    // Make sure the end offsets won't wrap.
720                    if request.dev_offset.checked_add(request.length as u64).is_none()
721                        || (code != BlockOpcode::Trim
722                            && request_bytes.checked_add(request.vmo_offset).is_none())
723                    {
724                        return Err(zx::Status::OUT_OF_RANGE);
725                    }
726                }
727                Ok(match code {
728                    BlockOpcode::Read => Operation::Read {
729                        device_block_offset: request.dev_offset,
730                        block_count: request.length,
731                        _unused: 0,
732                        vmo_offset: request
733                            .vmo_offset
734                            .checked_mul(self.block_size as u64)
735                            .ok_or(zx::Status::OUT_OF_RANGE)?,
736                        options: ReadOptions {
737                            inline_crypto: InlineCryptoOptions {
738                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
739                                dun: request.dun,
740                                slot: request.slot,
741                            },
742                        },
743                    },
744                    BlockOpcode::Write => {
745                        let mut options = WriteOptions {
746                            inline_crypto: InlineCryptoOptions {
747                                is_enabled: flags.contains(BlockIoFlag::INLINE_ENCRYPTION_ENABLED),
748                                dun: request.dun,
749                                slot: request.slot,
750                            },
751                            ..WriteOptions::default()
752                        };
753                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
754                            options.flags |= WriteFlags::FORCE_ACCESS;
755                        }
756                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
757                            options.flags |= WriteFlags::PRE_BARRIER;
758                        }
759                        Operation::Write {
760                            device_block_offset: request.dev_offset,
761                            block_count: request.length,
762                            _unused: 0,
763                            options,
764                            vmo_offset: request
765                                .vmo_offset
766                                .checked_mul(self.block_size as u64)
767                                .ok_or(zx::Status::OUT_OF_RANGE)?,
768                        }
769                    }
770                    BlockOpcode::Flush => Operation::Flush,
771                    BlockOpcode::Trim => Operation::Trim {
772                        device_block_offset: request.dev_offset,
773                        block_count: request.length,
774                    },
775                    BlockOpcode::CloseVmo => Operation::CloseVmo,
776                })
777            });
778
779        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
780            GroupOrRequest::Group(request.group)
781        } else {
782            GroupOrRequest::Request(request.reqid)
783        };
784
785        let mut active_requests = self.session_manager().active_requests().0.lock();
786        let mut request_id = None;
787
788        // Multiple Block I/O request may be sent as a group.
789        // Notes:
790        // - the group is identified by the group id in the request
791        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
792        //   flag is set.
793        // - when processing a request of a group fails, subsequent requests of that
794        //   group will not be processed.
795        // - decompression is a special case, see block-fifo.h for semantics.
796        //
797        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
798        if group_or_request.is_group() {
799            // Search for an existing entry that matches this group.  NOTE: This is a potentially
800            // expensive way to find a group (it's iterating over all slots in the active-requests
801            // slab).  This can be optimised easily should we need to.
802            for (key, group) in &mut active_requests.requests {
803                if group.group_or_request == group_or_request {
804                    if group.req_id.is_some() {
805                        // We have already received a request tagged as last.
806                        if group.status == zx::Status::OK {
807                            group.status = zx::Status::INVALID_ARGS;
808                        }
809                        // Ignore this request.
810                        return Err(None);
811                    }
812                    // See if this is a continuation of a decompressed read.
813                    if group.status == zx::Status::OK
814                        && let Some(info) = &mut group.decompression_info
815                    {
816                        if let Ok(Operation::Read {
817                            device_block_offset,
818                            mut block_count,
819                            options,
820                            vmo_offset: 0,
821                            ..
822                        }) = operation
823                        {
824                            let remaining_bytes = info
825                                .compressed_range
826                                .end
827                                .next_multiple_of(self.block_size as usize)
828                                as u64
829                                - info.bytes_so_far;
830                            if !flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD)
831                                || request.total_compressed_bytes != 0
832                                || request.uncompressed_bytes != 0
833                                || request.compressed_prefix_bytes != 0
834                                || (flags.contains(BlockIoFlag::GROUP_LAST)
835                                    && info.bytes_so_far + request_bytes
836                                        < info.compressed_range.end as u64)
837                                || (!flags.contains(BlockIoFlag::GROUP_LAST)
838                                    && request_bytes >= remaining_bytes)
839                            {
840                                group.status = zx::Status::INVALID_ARGS;
841                            } else {
842                                // We are tolerant of `block_count` being more than we actually
843                                // need.  This can happen if the client is working with a larger
844                                // block size than the device block size.  For example, if Blobfs
845                                // has a 8192 byte block size, but the device might has a 512 byte
846                                // block size, it can ask for a multiple of 16 blocks, when fewer
847                                // than that might actually be required to hold the compressed data.
848                                // It is easier for us to tolerate this here than to get Blobfs to
849                                // change to pass only the blocks that are required.
850                                if request_bytes > remaining_bytes {
851                                    block_count = (remaining_bytes / self.block_size as u64) as u32;
852                                }
853
854                                operation = Ok(Operation::ContinueDecompressedRead {
855                                    offset: info.bytes_so_far,
856                                    device_block_offset,
857                                    block_count,
858                                    options,
859                                });
860
861                                info.bytes_so_far += block_count as u64 * self.block_size as u64;
862                            }
863                        } else {
864                            group.status = zx::Status::INVALID_ARGS;
865                        }
866                    }
867                    if flags.contains(BlockIoFlag::GROUP_LAST) {
868                        group.req_id = Some(request.reqid);
869                        // If the group has had an error, there is no point trying to issue this
870                        // request.
871                        if group.status != zx::Status::OK {
872                            operation = Err(group.status);
873                        }
874                    } else if group.status != zx::Status::OK {
875                        // The group has already encountered an error, so there is no point trying
876                        // to issue this request.
877                        return Err(None);
878                    }
879                    request_id = Some(RequestId(key));
880                    group.count += 1;
881                    break;
882                }
883            }
884        }
885
886        let is_single_request =
887            !flags.contains(BlockIoFlag::GROUP_ITEM) || flags.contains(BlockIoFlag::GROUP_LAST);
888
889        let mut decompression_info = None;
890        let vmo = match operation {
891            Ok(Operation::Read {
892                device_block_offset,
893                mut block_count,
894                options,
895                vmo_offset,
896                ..
897            }) => match self.vmos.lock().get_mut(&request.vmoid) {
898                Some((vmo, mapping)) => {
899                    if flags.contains(BlockIoFlag::DECOMPRESS_WITH_ZSTD) {
900                        let compressed_range = request.compressed_prefix_bytes as usize
901                            ..request.compressed_prefix_bytes as usize
902                                + request.total_compressed_bytes as usize;
903                        let required_buffer_size =
904                            compressed_range.end.next_multiple_of(self.block_size as usize);
905
906                        // Validate the initial decompression request.
907                        if compressed_range.start >= compressed_range.end
908                            || vmo_offset.checked_add(request.uncompressed_bytes as u64).is_none()
909                            || (is_single_request && request_bytes < compressed_range.end as u64)
910                            || (!is_single_request && request_bytes >= required_buffer_size as u64)
911                        {
912                            Err(zx::Status::INVALID_ARGS)
913                        } else {
914                            // We are tolerant of `block_count` being more than we actually need.
915                            // This can happen if the client is working in a larger block size than
916                            // the device block size.  For example, Blobfs has a 8192 byte block
917                            // size, but the device might have a 512 byte block size.  It is easier
918                            // for us to tolerate this here than to get Blobfs to change to pass
919                            // only the blocks that are required.
920                            let bytes_so_far = if request_bytes > required_buffer_size as u64 {
921                                block_count =
922                                    (required_buffer_size / self.block_size as usize) as u32;
923                                required_buffer_size as u64
924                            } else {
925                                request_bytes
926                            };
927
928                            // To decompress, we need to have the target VMO mapped (cached).
929                            match mapping {
930                                Some(mapping) => Ok(mapping.clone()),
931                                None => {
932                                    VmoMapping::new(&vmo).inspect(|m| *mapping = Some(m.clone()))
933                                }
934                            }
935                            .and_then(|mapping| {
936                                // Make sure the `vmo_offset` and `uncompressed_bytes` are within
937                                // range.
938                                if vmo_offset
939                                    .checked_add(request.uncompressed_bytes as u64)
940                                    .is_some_and(|end| end <= mapping.size as u64)
941                                {
942                                    Ok(mapping)
943                                } else {
944                                    Err(zx::Status::OUT_OF_RANGE)
945                                }
946                            })
947                            .map(|mapping| {
948                                // Convert the operation into a `StartDecompressedRead`
949                                // operation. For non-fragmented requests, this will be the only
950                                // operation, but if it's a fragmented read,
951                                // `ContinueDecompressedRead` operations will follow.
952                                operation = Ok(Operation::StartDecompressedRead {
953                                    required_buffer_size,
954                                    device_block_offset,
955                                    block_count,
956                                    options,
957                                });
958                                // Record sufficient information so that we can decompress when all
959                                // the requests complete.
960                                decompression_info = Some(DecompressionInfo {
961                                    compressed_range,
962                                    bytes_so_far,
963                                    mapping,
964                                    uncompressed_range: vmo_offset
965                                        ..vmo_offset + request.uncompressed_bytes as u64,
966                                    buffer: None,
967                                });
968                                None
969                            })
970                        }
971                    } else {
972                        Ok(Some(vmo.clone()))
973                    }
974                }
975                None => Err(zx::Status::IO),
976            },
977            Ok(Operation::Write { .. }) => self
978                .vmos
979                .lock()
980                .get(&request.vmoid)
981                .cloned()
982                .map_or(Err(zx::Status::IO), |(vmo, _)| Ok(Some(vmo))),
983            Ok(Operation::CloseVmo) => {
984                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |(vmo, _)| {
985                    let vmo_clone = vmo.clone();
986                    // Make sure the VMO is dropped after all current Epoch guards have been
987                    // dropped.
988                    Epoch::global().defer(move || drop(vmo_clone));
989                    Ok(Some(vmo))
990                })
991            }
992            _ => Ok(None),
993        }
994        .unwrap_or_else(|e| {
995            operation = Err(e);
996            None
997        });
998
999        let trace_flow_id = NonZero::new(request.trace_flow_id);
1000        let request_id = request_id.unwrap_or_else(|| {
1001            RequestId(active_requests.requests.insert(ActiveRequest {
1002                session,
1003                group_or_request,
1004                trace_flow_id,
1005                _epoch_guard: Epoch::global().guard(),
1006                status: zx::Status::OK,
1007                count: 1,
1008                req_id: is_single_request.then_some(request.reqid),
1009                decompression_info,
1010            }))
1011        });
1012
1013        Ok(DecodedRequest {
1014            request_id,
1015            trace_flow_id,
1016            operation: operation.map_err(|status| {
1017                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
1018            })?,
1019            vmo,
1020        })
1021    }
1022
1023    fn take_vmos(&self) -> BTreeMap<u16, (Arc<zx::Vmo>, Option<Arc<VmoMapping>>)> {
1024        std::mem::take(&mut *self.vmos.lock())
1025    }
1026
1027    /// Maps the request and returns the mapped request with an optional remainder.
1028    fn map_request(
1029        &self,
1030        mut request: DecodedRequest,
1031        active_request: &mut ActiveRequest<SM::Session>,
1032    ) -> Result<(DecodedRequest, Option<DecodedRequest>), zx::Status> {
1033        if active_request.status != zx::Status::OK {
1034            return Err(zx::Status::BAD_STATE);
1035        }
1036        let mapping = self.offset_map.mapping();
1037        match (mapping, request.operation.blocks()) {
1038            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
1039                return Err(zx::Status::OUT_OF_RANGE);
1040            }
1041            _ => {}
1042        }
1043        let remainder = request.operation.map(
1044            self.offset_map.mapping(),
1045            self.offset_map.max_transfer_blocks(),
1046            self.block_size,
1047        );
1048        if remainder.is_some() {
1049            active_request.count += 1;
1050        }
1051        static CACHE: AtomicU64 = AtomicU64::new(0);
1052        if let Some(context) =
1053            fuchsia_trace::TraceCategoryContext::acquire_cached("storage", &CACHE)
1054        {
1055            use fuchsia_trace::ArgValue;
1056            let trace_args = [
1057                ArgValue::of("request_id", request.request_id.0),
1058                ArgValue::of("opcode", request.operation.trace_label()),
1059            ];
1060            let _scope =
1061                fuchsia_trace::duration("storage", "block_server::start_transaction", &trace_args);
1062            if let Some(trace_flow_id) = active_request.trace_flow_id {
1063                fuchsia_trace::flow_step(
1064                    &context,
1065                    "block_server::start_transaction",
1066                    trace_flow_id.get().into(),
1067                    &[],
1068                );
1069            }
1070        }
1071        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
1072        Ok((request, remainder))
1073    }
1074
1075    /// Drops all requests for which `pred` is true.
1076    ///
1077    /// NOTE: This should only be called once we are certain that the requests will not be
1078    /// completed asynchronously  Otherwise, requests might be completed twice.
1079    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
1080        self.session_manager().active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
1081    }
1082
1083    /// Closes all grouped requests for which `pred` is true and which are held open pending the
1084    /// completion of their group.
1085    ///
1086    /// Normally, a request is dropped from ActiveRequests when it is completed.  However, if a
1087    /// request is part of a group, it will not be dropped until a request with GROUP_LAST arrives.
1088    /// If we're shutting down a session, the client may not ever send the GROUP_LAST, so we need to
1089    /// be sure to close these grouped requests.
1090    ///
1091    /// This is called during session shutdown in situations where [`Self::drop_active_requests`]
1092    /// cannot be used (e.g. for the callback interface, which hands off the responsibility of
1093    /// completing requests to its concrete implementation and cannot control when requests are
1094    /// completed relative to session shutdown).
1095    fn close_active_groups(&self, pred: impl Fn(&SM::Session) -> bool) {
1096        self.session_manager().active_requests().0.lock().requests.retain(|_, request| {
1097            if !pred(&request.session) || request.req_id.is_some() {
1098                return true;
1099            }
1100            // Mark the group as completed, and immediately drop any which have no outstanding
1101            // requests (since they will otherwise never be dropped).
1102            request.req_id = Some(u32::MAX);
1103            request.count > 0
1104        });
1105    }
1106}
1107
1108#[repr(transparent)]
1109#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Ord, PartialOrd)]
1110pub struct RequestId(usize);
1111
1112#[derive(Clone, Debug)]
1113struct DecodedRequest {
1114    request_id: RequestId,
1115    trace_flow_id: TraceFlowId,
1116    operation: Operation,
1117    vmo: Option<Arc<zx::Vmo>>,
1118}
1119
1120/// cbindgen:no-export
1121pub type WriteFlags = block_protocol::WriteFlags;
1122pub type WriteOptions = block_protocol::WriteOptions;
1123pub type ReadOptions = block_protocol::ReadOptions;
1124pub type InlineCryptoOptions = block_protocol::InlineCryptoOptions;
1125
1126#[repr(C)]
1127#[derive(Clone, Debug, PartialEq, Eq)]
1128pub enum Operation {
1129    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
1130    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
1131    // code can read `device_block_offset` from the read variant and then assume it's valid for the
1132    // write variant).
1133    Read {
1134        device_block_offset: u64,
1135        block_count: u32,
1136        _unused: u32,
1137        vmo_offset: u64,
1138        options: ReadOptions,
1139    },
1140    Write {
1141        device_block_offset: u64,
1142        block_count: u32,
1143        _unused: u32,
1144        vmo_offset: u64,
1145        options: WriteOptions,
1146    },
1147    Flush,
1148    Trim {
1149        device_block_offset: u64,
1150        block_count: u32,
1151    },
1152    /// This will never be seen by the C interface.
1153    CloseVmo,
1154    StartDecompressedRead {
1155        required_buffer_size: usize,
1156        device_block_offset: u64,
1157        block_count: u32,
1158        options: ReadOptions,
1159    },
1160    ContinueDecompressedRead {
1161        offset: u64,
1162        device_block_offset: u64,
1163        block_count: u32,
1164        options: ReadOptions,
1165    },
1166}
1167
1168impl Operation {
1169    fn trace_label(&self) -> &'static str {
1170        match self {
1171            Operation::Read { .. } => "read",
1172            Operation::Write { .. } => "write",
1173            Operation::Flush { .. } => "flush",
1174            Operation::Trim { .. } => "trim",
1175            Operation::CloseVmo { .. } => "close_vmo",
1176            Operation::StartDecompressedRead { .. } => "start_decompressed_read",
1177            Operation::ContinueDecompressedRead { .. } => "continue_decompressed_read",
1178        }
1179    }
1180
1181    /// Returns (offset, length).
1182    fn blocks(&self) -> Option<(u64, u32)> {
1183        match self {
1184            Operation::Read { device_block_offset, block_count, .. }
1185            | Operation::Write { device_block_offset, block_count, .. }
1186            | Operation::Trim { device_block_offset, block_count, .. } => {
1187                Some((*device_block_offset, *block_count))
1188            }
1189            _ => None,
1190        }
1191    }
1192
1193    /// Returns mutable references to (offset, length).
1194    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
1195        match self {
1196            Operation::Read { device_block_offset, block_count, .. }
1197            | Operation::Write { device_block_offset, block_count, .. }
1198            | Operation::Trim { device_block_offset, block_count, .. } => {
1199                Some((device_block_offset, block_count))
1200            }
1201            _ => None,
1202        }
1203    }
1204
1205    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
1206    /// start of the operation.
1207    fn map(
1208        &mut self,
1209        mapping: Option<&BlockOffsetMapping>,
1210        max_blocks: Option<NonZero<u32>>,
1211        block_size: u32,
1212    ) -> Option<Self> {
1213        let mut max = match self {
1214            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
1215            _ => None,
1216        };
1217        let (offset, length) = self.blocks_mut()?;
1218        let orig_offset = *offset;
1219        if let Some(mapping) = mapping {
1220            let delta = *offset - mapping.source_block_offset;
1221            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
1222            *offset = mapping.target_block_offset + delta;
1223            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
1224            max = match max {
1225                None => Some(mapping_max),
1226                Some(m) => Some(std::cmp::min(m, mapping_max)),
1227            };
1228        };
1229        if let Some(max) = max {
1230            if *length as u64 > max {
1231                let rem = (*length as u64 - max) as u32;
1232                *length = max as u32;
1233                return Some(match self {
1234                    Operation::Read {
1235                        device_block_offset: _,
1236                        block_count: _,
1237                        vmo_offset,
1238                        _unused,
1239                        options,
1240                    } => {
1241                        let mut options = *options;
1242                        options.inline_crypto.dun += max as u32;
1243                        Operation::Read {
1244                            device_block_offset: orig_offset + max,
1245                            block_count: rem,
1246                            vmo_offset: *vmo_offset + max * block_size as u64,
1247                            _unused: *_unused,
1248                            options: options,
1249                        }
1250                    }
1251                    Operation::Write {
1252                        device_block_offset: _,
1253                        block_count: _,
1254                        _unused,
1255                        vmo_offset,
1256                        options,
1257                    } => {
1258                        let mut options = *options;
1259                        options.inline_crypto.dun += max as u32;
1260                        Operation::Write {
1261                            device_block_offset: orig_offset + max,
1262                            block_count: rem,
1263                            _unused: *_unused,
1264                            vmo_offset: *vmo_offset + max * block_size as u64,
1265                            options: options,
1266                        }
1267                    }
1268                    Operation::Trim { device_block_offset: _, block_count: _ } => {
1269                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
1270                    }
1271                    _ => unreachable!(),
1272                });
1273            }
1274        }
1275        None
1276    }
1277
1278    /// Returns true if the specified write flags are set.
1279    pub fn has_write_flag(&self, value: WriteFlags) -> bool {
1280        if let Operation::Write { options, .. } = self {
1281            options.flags.contains(value)
1282        } else {
1283            false
1284        }
1285    }
1286
1287    /// Removes `value` from the request's write flags and returns true if the flag was set.
1288    pub fn take_write_flag(&mut self, value: WriteFlags) -> bool {
1289        if let Operation::Write { options, .. } = self {
1290            let result = options.flags.contains(value);
1291            options.flags.remove(value);
1292            result
1293        } else {
1294            false
1295        }
1296    }
1297}
1298
1299#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1300pub enum GroupOrRequest {
1301    Group(u16),
1302    Request(u32),
1303}
1304
1305impl GroupOrRequest {
1306    fn is_group(&self) -> bool {
1307        matches!(self, Self::Group(_))
1308    }
1309
1310    fn group_id(&self) -> Option<u16> {
1311        match self {
1312            Self::Group(id) => Some(*id),
1313            Self::Request(_) => None,
1314        }
1315    }
1316}
1317
1318#[cfg(test)]
1319mod tests {
1320    use super::{
1321        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1322        TraceFlowId,
1323    };
1324    use assert_matches::assert_matches;
1325    use block_protocol::{
1326        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, InlineCryptoOptions, ReadOptions,
1327        WriteFlags, WriteOptions,
1328    };
1329    use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
1330    use fuchsia_sync::Mutex;
1331    use futures::FutureExt as _;
1332    use futures::channel::oneshot;
1333    use futures::future::BoxFuture;
1334    use std::borrow::Cow;
1335    use std::future::poll_fn;
1336    use std::num::NonZero;
1337    use std::pin::pin;
1338    use std::sync::Arc;
1339    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
1340    use std::task::{Context, Poll};
1341    use zx::HandleBased as _;
1342    use {fidl_fuchsia_storage_block as fblock, fuchsia_async as fasync};
1343
1344    #[derive(Default)]
1345    struct MockInterface {
1346        read_hook: Option<
1347            Box<
1348                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1349                    + Send
1350                    + Sync,
1351            >,
1352        >,
1353        write_hook:
1354            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1355        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1356    }
1357
1358    impl super::async_interface::Interface for MockInterface {
1359        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1360            Ok(())
1361        }
1362
1363        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1364            Cow::Owned(test_device_info())
1365        }
1366
1367        async fn read(
1368            &self,
1369            device_block_offset: u64,
1370            block_count: u32,
1371            vmo: &Arc<zx::Vmo>,
1372            vmo_offset: u64,
1373            _opts: ReadOptions,
1374            _trace_flow_id: TraceFlowId,
1375        ) -> Result<(), zx::Status> {
1376            if let Some(read_hook) = &self.read_hook {
1377                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1378            } else {
1379                unimplemented!();
1380            }
1381        }
1382
1383        async fn write(
1384            &self,
1385            device_block_offset: u64,
1386            _block_count: u32,
1387            _vmo: &Arc<zx::Vmo>,
1388            _vmo_offset: u64,
1389            opts: WriteOptions,
1390            _trace_flow_id: TraceFlowId,
1391        ) -> Result<(), zx::Status> {
1392            if opts.flags.contains(WriteFlags::PRE_BARRIER)
1393                && let Some(barrier_hook) = &self.barrier_hook
1394            {
1395                barrier_hook()?;
1396            }
1397            if let Some(write_hook) = &self.write_hook {
1398                write_hook(device_block_offset).await
1399            } else {
1400                unimplemented!();
1401            }
1402        }
1403
1404        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1405            Ok(())
1406        }
1407
1408        async fn trim(
1409            &self,
1410            _device_block_offset: u64,
1411            _block_count: u32,
1412            _trace_flow_id: TraceFlowId,
1413        ) -> Result<(), zx::Status> {
1414            unreachable!();
1415        }
1416
1417        async fn get_volume_info(
1418            &self,
1419        ) -> Result<(fblock::VolumeManagerInfo, fblock::VolumeInfo), zx::Status> {
1420            // Hang forever for the test_requests_dont_block_sessions test.
1421            let () = std::future::pending().await;
1422            unreachable!();
1423        }
1424    }
1425
1426    const BLOCK_SIZE: u32 = 512;
1427    const MAX_TRANSFER_BLOCKS: u32 = 10;
1428
1429    fn test_device_info() -> DeviceInfo {
1430        DeviceInfo::Partition(PartitionInfo {
1431            device_flags: fblock::DeviceFlag::READONLY
1432                | fblock::DeviceFlag::BARRIER_SUPPORT
1433                | fblock::DeviceFlag::FUA_SUPPORT,
1434            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1435            block_range: Some(0..100),
1436            type_guid: [1; 16],
1437            instance_guid: [2; 16],
1438            name: "foo".to_string(),
1439            flags: 0xabcd,
1440        })
1441    }
1442
1443    #[fuchsia::test]
1444    async fn test_barriers_ordering() {
1445        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1446        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1447        let barrier_called = Arc::new(AtomicBool::new(false));
1448
1449        futures::join!(
1450            async move {
1451                let barrier_called_clone = barrier_called.clone();
1452                let block_server = BlockServer::new(
1453                    BLOCK_SIZE,
1454                    Arc::new(MockInterface {
1455                        barrier_hook: Some(Box::new(move || {
1456                            barrier_called.store(true, Ordering::Relaxed);
1457                            Ok(())
1458                        })),
1459                        write_hook: Some(Box::new(move |device_block_offset| {
1460                            let barrier_called = barrier_called_clone.clone();
1461                            Box::pin(async move {
1462                                // The sleep allows the server to reorder the fifo requests.
1463                                if device_block_offset % 2 == 0 {
1464                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1465                                        zx::MonotonicDuration::from_millis(200),
1466                                    ))
1467                                    .await;
1468                                }
1469                                assert!(barrier_called.load(Ordering::Relaxed));
1470                                Ok(())
1471                            })
1472                        })),
1473                        ..MockInterface::default()
1474                    }),
1475                );
1476                block_server.handle_requests(stream).await.unwrap();
1477            },
1478            async move {
1479                let (session_proxy, server) = fidl::endpoints::create_proxy();
1480
1481                proxy.open_session(server).unwrap();
1482
1483                let vmo_id = session_proxy
1484                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1485                    .await
1486                    .unwrap()
1487                    .unwrap();
1488                assert_ne!(vmo_id.id, 0);
1489
1490                let mut fifo =
1491                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1492                let (mut reader, mut writer) = fifo.async_io();
1493
1494                writer
1495                    .write_entries(&BlockFifoRequest {
1496                        command: BlockFifoCommand {
1497                            opcode: BlockOpcode::Write.into_primitive(),
1498                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1499                            ..Default::default()
1500                        },
1501                        vmoid: vmo_id.id,
1502                        dev_offset: 0,
1503                        length: 5,
1504                        vmo_offset: 6,
1505                        ..Default::default()
1506                    })
1507                    .await
1508                    .unwrap();
1509
1510                for i in 0..10 {
1511                    writer
1512                        .write_entries(&BlockFifoRequest {
1513                            command: BlockFifoCommand {
1514                                opcode: BlockOpcode::Write.into_primitive(),
1515                                ..Default::default()
1516                            },
1517                            vmoid: vmo_id.id,
1518                            dev_offset: i + 1,
1519                            length: 5,
1520                            vmo_offset: 6,
1521                            ..Default::default()
1522                        })
1523                        .await
1524                        .unwrap();
1525                }
1526                for _ in 0..11 {
1527                    let mut response = BlockFifoResponse::default();
1528                    reader.read_entries(&mut response).await.unwrap();
1529                    assert_eq!(response.status, zx::sys::ZX_OK);
1530                }
1531
1532                std::mem::drop(proxy);
1533            }
1534        );
1535    }
1536
1537    #[fuchsia::test]
1538    async fn test_info() {
1539        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1540
1541        futures::join!(
1542            async {
1543                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1544                block_server.handle_requests(stream).await.unwrap();
1545            },
1546            async {
1547                let expected_info = test_device_info();
1548                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1549                    info
1550                } else {
1551                    unreachable!()
1552                };
1553
1554                let block_info = proxy.get_info().await.unwrap().unwrap();
1555                assert_eq!(
1556                    block_info.block_count,
1557                    partition_info.block_range.as_ref().unwrap().end
1558                        - partition_info.block_range.as_ref().unwrap().start
1559                );
1560                assert_eq!(
1561                    block_info.flags,
1562                    fblock::DeviceFlag::READONLY
1563                        | fblock::DeviceFlag::ZSTD_DECOMPRESSION_SUPPORT
1564                        | fblock::DeviceFlag::BARRIER_SUPPORT
1565                        | fblock::DeviceFlag::FUA_SUPPORT
1566                );
1567
1568                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1569
1570                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1571                assert_eq!(status, zx::sys::ZX_OK);
1572                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1573
1574                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1575                assert_eq!(status, zx::sys::ZX_OK);
1576                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1577
1578                let (status, name) = proxy.get_name().await.unwrap();
1579                assert_eq!(status, zx::sys::ZX_OK);
1580                assert_eq!(name.as_ref(), Some(&partition_info.name));
1581
1582                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1583                assert_eq!(metadata.name, name);
1584                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1585                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1586                assert_eq!(
1587                    metadata.start_block_offset,
1588                    Some(partition_info.block_range.as_ref().unwrap().start)
1589                );
1590                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1591                assert_eq!(metadata.flags, Some(partition_info.flags));
1592
1593                std::mem::drop(proxy);
1594            }
1595        );
1596    }
1597
1598    #[fuchsia::test]
1599    async fn test_attach_vmo() {
1600        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1601
1602        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1603        let koid = vmo.koid().unwrap();
1604
1605        futures::join!(
1606            async {
1607                let block_server = BlockServer::new(
1608                    BLOCK_SIZE,
1609                    Arc::new(MockInterface {
1610                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1611                            assert_eq!(vmo.koid().unwrap(), koid);
1612                            Box::pin(async { Ok(()) })
1613                        })),
1614                        ..MockInterface::default()
1615                    }),
1616                );
1617                block_server.handle_requests(stream).await.unwrap();
1618            },
1619            async move {
1620                let (session_proxy, server) = fidl::endpoints::create_proxy();
1621
1622                proxy.open_session(server).unwrap();
1623
1624                let vmo_id = session_proxy
1625                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1626                    .await
1627                    .unwrap()
1628                    .unwrap();
1629                assert_ne!(vmo_id.id, 0);
1630
1631                let mut fifo =
1632                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1633                let (mut reader, mut writer) = fifo.async_io();
1634
1635                // Keep attaching VMOs until we eventually hit the maximum.
1636                let mut count = 1;
1637                loop {
1638                    match session_proxy
1639                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1640                        .await
1641                        .unwrap()
1642                    {
1643                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1644                        Err(e) => {
1645                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1646                            break;
1647                        }
1648                    }
1649
1650                    // Only test every 10 to keep test time down.
1651                    if count % 10 == 0 {
1652                        writer
1653                            .write_entries(&BlockFifoRequest {
1654                                command: BlockFifoCommand {
1655                                    opcode: BlockOpcode::Read.into_primitive(),
1656                                    ..Default::default()
1657                                },
1658                                vmoid: vmo_id.id,
1659                                length: 1,
1660                                ..Default::default()
1661                            })
1662                            .await
1663                            .unwrap();
1664
1665                        let mut response = BlockFifoResponse::default();
1666                        reader.read_entries(&mut response).await.unwrap();
1667                        assert_eq!(response.status, zx::sys::ZX_OK);
1668                    }
1669
1670                    count += 1;
1671                }
1672
1673                assert_eq!(count, u16::MAX as u64);
1674
1675                // Detach the original VMO, and make sure we can then attach another one.
1676                writer
1677                    .write_entries(&BlockFifoRequest {
1678                        command: BlockFifoCommand {
1679                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1680                            ..Default::default()
1681                        },
1682                        vmoid: vmo_id.id,
1683                        ..Default::default()
1684                    })
1685                    .await
1686                    .unwrap();
1687
1688                let mut response = BlockFifoResponse::default();
1689                reader.read_entries(&mut response).await.unwrap();
1690                assert_eq!(response.status, zx::sys::ZX_OK);
1691
1692                let new_vmo_id = session_proxy
1693                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1694                    .await
1695                    .unwrap()
1696                    .unwrap();
1697                // It should reuse the same ID.
1698                assert_eq!(new_vmo_id.id, vmo_id.id);
1699
1700                std::mem::drop(proxy);
1701            }
1702        );
1703    }
1704
1705    #[fuchsia::test]
1706    async fn test_close() {
1707        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1708
1709        let mut server = std::pin::pin!(
1710            async {
1711                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1712                block_server.handle_requests(stream).await.unwrap();
1713            }
1714            .fuse()
1715        );
1716
1717        let mut client = std::pin::pin!(
1718            async {
1719                let (session_proxy, server) = fidl::endpoints::create_proxy();
1720
1721                proxy.open_session(server).unwrap();
1722
1723                // Dropping the proxy should not cause the session to terminate because the session is
1724                // still live.
1725                std::mem::drop(proxy);
1726
1727                session_proxy.close().await.unwrap().unwrap();
1728
1729                // Keep the session alive.  Calling `close` should cause the server to terminate.
1730                let _: () = std::future::pending().await;
1731            }
1732            .fuse()
1733        );
1734
1735        futures::select!(
1736            _ = server => {}
1737            _ = client => unreachable!(),
1738        );
1739    }
1740
1741    #[derive(Default)]
1742    struct IoMockInterface {
1743        do_checks: bool,
1744        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1745        return_errors: bool,
1746    }
1747
1748    #[derive(Debug)]
1749    enum ExpectedOp {
1750        Read(u64, u32, u64),
1751        Write(u64, u32, u64),
1752        Trim(u64, u32),
1753        Flush,
1754    }
1755
1756    impl super::async_interface::Interface for IoMockInterface {
1757        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1758            Ok(())
1759        }
1760
1761        fn get_info(&self) -> Cow<'_, DeviceInfo> {
1762            Cow::Owned(test_device_info())
1763        }
1764
1765        async fn read(
1766            &self,
1767            device_block_offset: u64,
1768            block_count: u32,
1769            _vmo: &Arc<zx::Vmo>,
1770            vmo_offset: u64,
1771            _opts: ReadOptions,
1772            _trace_flow_id: TraceFlowId,
1773        ) -> Result<(), zx::Status> {
1774            if self.return_errors {
1775                Err(zx::Status::INTERNAL)
1776            } else {
1777                if self.do_checks {
1778                    assert_matches!(
1779                        self.expected_op.lock().take(),
1780                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1781                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1782                        "Read {device_block_offset} {block_count} {vmo_offset}"
1783                    );
1784                }
1785                Ok(())
1786            }
1787        }
1788
1789        async fn write(
1790            &self,
1791            device_block_offset: u64,
1792            block_count: u32,
1793            _vmo: &Arc<zx::Vmo>,
1794            vmo_offset: u64,
1795            _write_opts: WriteOptions,
1796            _trace_flow_id: TraceFlowId,
1797        ) -> Result<(), zx::Status> {
1798            if self.return_errors {
1799                Err(zx::Status::NOT_SUPPORTED)
1800            } else {
1801                if self.do_checks {
1802                    assert_matches!(
1803                        self.expected_op.lock().take(),
1804                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1805                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1806                        "Write {device_block_offset} {block_count} {vmo_offset}"
1807                    );
1808                }
1809                Ok(())
1810            }
1811        }
1812
1813        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1814            if self.return_errors {
1815                Err(zx::Status::NO_RESOURCES)
1816            } else {
1817                if self.do_checks {
1818                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1819                }
1820                Ok(())
1821            }
1822        }
1823
1824        async fn trim(
1825            &self,
1826            device_block_offset: u64,
1827            block_count: u32,
1828            _trace_flow_id: TraceFlowId,
1829        ) -> Result<(), zx::Status> {
1830            if self.return_errors {
1831                Err(zx::Status::NO_MEMORY)
1832            } else {
1833                if self.do_checks {
1834                    assert_matches!(
1835                        self.expected_op.lock().take(),
1836                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1837                            block_count == b,
1838                        "Trim {device_block_offset} {block_count}"
1839                    );
1840                }
1841                Ok(())
1842            }
1843        }
1844    }
1845
1846    #[fuchsia::test]
1847    async fn test_io() {
1848        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1849
1850        let expected_op = Arc::new(Mutex::new(None));
1851        let expected_op_clone = expected_op.clone();
1852
1853        let server = async {
1854            let block_server = BlockServer::new(
1855                BLOCK_SIZE,
1856                Arc::new(IoMockInterface {
1857                    return_errors: false,
1858                    do_checks: true,
1859                    expected_op: expected_op_clone,
1860                }),
1861            );
1862            block_server.handle_requests(stream).await.unwrap();
1863        };
1864
1865        let client = async move {
1866            let (session_proxy, server) = fidl::endpoints::create_proxy();
1867
1868            proxy.open_session(server).unwrap();
1869
1870            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1871            let vmo_id = session_proxy
1872                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1873                .await
1874                .unwrap()
1875                .unwrap();
1876
1877            let mut fifo =
1878                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1879            let (mut reader, mut writer) = fifo.async_io();
1880
1881            // READ
1882            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1883            writer
1884                .write_entries(&BlockFifoRequest {
1885                    command: BlockFifoCommand {
1886                        opcode: BlockOpcode::Read.into_primitive(),
1887                        ..Default::default()
1888                    },
1889                    vmoid: vmo_id.id,
1890                    dev_offset: 1,
1891                    length: 2,
1892                    vmo_offset: 3,
1893                    ..Default::default()
1894                })
1895                .await
1896                .unwrap();
1897
1898            let mut response = BlockFifoResponse::default();
1899            reader.read_entries(&mut response).await.unwrap();
1900            assert_eq!(response.status, zx::sys::ZX_OK);
1901
1902            // WRITE
1903            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1904            writer
1905                .write_entries(&BlockFifoRequest {
1906                    command: BlockFifoCommand {
1907                        opcode: BlockOpcode::Write.into_primitive(),
1908                        ..Default::default()
1909                    },
1910                    vmoid: vmo_id.id,
1911                    dev_offset: 4,
1912                    length: 5,
1913                    vmo_offset: 6,
1914                    ..Default::default()
1915                })
1916                .await
1917                .unwrap();
1918
1919            let mut response = BlockFifoResponse::default();
1920            reader.read_entries(&mut response).await.unwrap();
1921            assert_eq!(response.status, zx::sys::ZX_OK);
1922
1923            // FLUSH
1924            *expected_op.lock() = Some(ExpectedOp::Flush);
1925            writer
1926                .write_entries(&BlockFifoRequest {
1927                    command: BlockFifoCommand {
1928                        opcode: BlockOpcode::Flush.into_primitive(),
1929                        ..Default::default()
1930                    },
1931                    ..Default::default()
1932                })
1933                .await
1934                .unwrap();
1935
1936            reader.read_entries(&mut response).await.unwrap();
1937            assert_eq!(response.status, zx::sys::ZX_OK);
1938
1939            // TRIM
1940            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1941            writer
1942                .write_entries(&BlockFifoRequest {
1943                    command: BlockFifoCommand {
1944                        opcode: BlockOpcode::Trim.into_primitive(),
1945                        ..Default::default()
1946                    },
1947                    dev_offset: 7,
1948                    length: 8,
1949                    ..Default::default()
1950                })
1951                .await
1952                .unwrap();
1953
1954            reader.read_entries(&mut response).await.unwrap();
1955            assert_eq!(response.status, zx::sys::ZX_OK);
1956
1957            std::mem::drop(proxy);
1958        };
1959
1960        futures::join!(server, client);
1961    }
1962
1963    #[fuchsia::test]
1964    async fn test_io_errors() {
1965        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
1966
1967        futures::join!(
1968            async {
1969                let block_server = BlockServer::new(
1970                    BLOCK_SIZE,
1971                    Arc::new(IoMockInterface {
1972                        return_errors: true,
1973                        do_checks: false,
1974                        expected_op: Arc::new(Mutex::new(None)),
1975                    }),
1976                );
1977                block_server.handle_requests(stream).await.unwrap();
1978            },
1979            async move {
1980                let (session_proxy, server) = fidl::endpoints::create_proxy();
1981
1982                proxy.open_session(server).unwrap();
1983
1984                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1985                let vmo_id = session_proxy
1986                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1987                    .await
1988                    .unwrap()
1989                    .unwrap();
1990
1991                let mut fifo =
1992                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1993                let (mut reader, mut writer) = fifo.async_io();
1994
1995                // READ
1996                writer
1997                    .write_entries(&BlockFifoRequest {
1998                        command: BlockFifoCommand {
1999                            opcode: BlockOpcode::Read.into_primitive(),
2000                            ..Default::default()
2001                        },
2002                        vmoid: vmo_id.id,
2003                        length: 1,
2004                        reqid: 1,
2005                        ..Default::default()
2006                    })
2007                    .await
2008                    .unwrap();
2009
2010                let mut response = BlockFifoResponse::default();
2011                reader.read_entries(&mut response).await.unwrap();
2012                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
2013
2014                // WRITE
2015                writer
2016                    .write_entries(&BlockFifoRequest {
2017                        command: BlockFifoCommand {
2018                            opcode: BlockOpcode::Write.into_primitive(),
2019                            ..Default::default()
2020                        },
2021                        vmoid: vmo_id.id,
2022                        length: 1,
2023                        reqid: 2,
2024                        ..Default::default()
2025                    })
2026                    .await
2027                    .unwrap();
2028
2029                reader.read_entries(&mut response).await.unwrap();
2030                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
2031
2032                // FLUSH
2033                writer
2034                    .write_entries(&BlockFifoRequest {
2035                        command: BlockFifoCommand {
2036                            opcode: BlockOpcode::Flush.into_primitive(),
2037                            ..Default::default()
2038                        },
2039                        reqid: 3,
2040                        ..Default::default()
2041                    })
2042                    .await
2043                    .unwrap();
2044
2045                reader.read_entries(&mut response).await.unwrap();
2046                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
2047
2048                // TRIM
2049                writer
2050                    .write_entries(&BlockFifoRequest {
2051                        command: BlockFifoCommand {
2052                            opcode: BlockOpcode::Trim.into_primitive(),
2053                            ..Default::default()
2054                        },
2055                        reqid: 4,
2056                        length: 1,
2057                        ..Default::default()
2058                    })
2059                    .await
2060                    .unwrap();
2061
2062                reader.read_entries(&mut response).await.unwrap();
2063                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
2064
2065                std::mem::drop(proxy);
2066            }
2067        );
2068    }
2069
2070    #[fuchsia::test]
2071    async fn test_invalid_args() {
2072        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2073
2074        futures::join!(
2075            async {
2076                let block_server = BlockServer::new(
2077                    BLOCK_SIZE,
2078                    Arc::new(IoMockInterface {
2079                        return_errors: false,
2080                        do_checks: false,
2081                        expected_op: Arc::new(Mutex::new(None)),
2082                    }),
2083                );
2084                block_server.handle_requests(stream).await.unwrap();
2085            },
2086            async move {
2087                let (session_proxy, server) = fidl::endpoints::create_proxy();
2088
2089                proxy.open_session(server).unwrap();
2090
2091                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2092                let vmo_id = session_proxy
2093                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2094                    .await
2095                    .unwrap()
2096                    .unwrap();
2097
2098                let mut fifo =
2099                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2100
2101                async fn test(
2102                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
2103                    request: BlockFifoRequest,
2104                ) -> Result<(), zx::Status> {
2105                    let (mut reader, mut writer) = fifo.async_io();
2106                    writer.write_entries(&request).await.unwrap();
2107                    let mut response = BlockFifoResponse::default();
2108                    reader.read_entries(&mut response).await.unwrap();
2109                    zx::Status::ok(response.status)
2110                }
2111
2112                // READ
2113
2114                let good_read_request = || BlockFifoRequest {
2115                    command: BlockFifoCommand {
2116                        opcode: BlockOpcode::Read.into_primitive(),
2117                        ..Default::default()
2118                    },
2119                    length: 1,
2120                    vmoid: vmo_id.id,
2121                    ..Default::default()
2122                };
2123
2124                assert_eq!(
2125                    test(
2126                        &mut fifo,
2127                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
2128                    )
2129                    .await,
2130                    Err(zx::Status::IO)
2131                );
2132
2133                assert_eq!(
2134                    test(
2135                        &mut fifo,
2136                        BlockFifoRequest {
2137                            vmo_offset: 0xffff_ffff_ffff_ffff,
2138                            ..good_read_request()
2139                        }
2140                    )
2141                    .await,
2142                    Err(zx::Status::OUT_OF_RANGE)
2143                );
2144
2145                assert_eq!(
2146                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
2147                    Err(zx::Status::INVALID_ARGS)
2148                );
2149
2150                // WRITE
2151
2152                let good_write_request = || BlockFifoRequest {
2153                    command: BlockFifoCommand {
2154                        opcode: BlockOpcode::Write.into_primitive(),
2155                        ..Default::default()
2156                    },
2157                    length: 1,
2158                    vmoid: vmo_id.id,
2159                    ..Default::default()
2160                };
2161
2162                assert_eq!(
2163                    test(
2164                        &mut fifo,
2165                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2166                    )
2167                    .await,
2168                    Err(zx::Status::IO)
2169                );
2170
2171                assert_eq!(
2172                    test(
2173                        &mut fifo,
2174                        BlockFifoRequest {
2175                            vmo_offset: 0xffff_ffff_ffff_ffff,
2176                            ..good_write_request()
2177                        }
2178                    )
2179                    .await,
2180                    Err(zx::Status::OUT_OF_RANGE)
2181                );
2182
2183                assert_eq!(
2184                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2185                    Err(zx::Status::INVALID_ARGS)
2186                );
2187
2188                // CLOSE VMO
2189
2190                assert_eq!(
2191                    test(
2192                        &mut fifo,
2193                        BlockFifoRequest {
2194                            command: BlockFifoCommand {
2195                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2196                                ..Default::default()
2197                            },
2198                            vmoid: vmo_id.id + 1,
2199                            ..Default::default()
2200                        }
2201                    )
2202                    .await,
2203                    Err(zx::Status::IO)
2204                );
2205
2206                std::mem::drop(proxy);
2207            }
2208        );
2209    }
2210
2211    #[fuchsia::test]
2212    async fn test_concurrent_requests() {
2213        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2214
2215        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2216        let waiting_readers_clone = waiting_readers.clone();
2217
2218        futures::join!(
2219            async move {
2220                let block_server = BlockServer::new(
2221                    BLOCK_SIZE,
2222                    Arc::new(MockInterface {
2223                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2224                            let (tx, rx) = oneshot::channel();
2225                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2226                            Box::pin(async move {
2227                                let _ = rx.await;
2228                                Ok(())
2229                            })
2230                        })),
2231                        ..MockInterface::default()
2232                    }),
2233                );
2234                block_server.handle_requests(stream).await.unwrap();
2235            },
2236            async move {
2237                let (session_proxy, server) = fidl::endpoints::create_proxy();
2238
2239                proxy.open_session(server).unwrap();
2240
2241                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2242                let vmo_id = session_proxy
2243                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2244                    .await
2245                    .unwrap()
2246                    .unwrap();
2247
2248                let mut fifo =
2249                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2250                let (mut reader, mut writer) = fifo.async_io();
2251
2252                writer
2253                    .write_entries(&BlockFifoRequest {
2254                        command: BlockFifoCommand {
2255                            opcode: BlockOpcode::Read.into_primitive(),
2256                            ..Default::default()
2257                        },
2258                        reqid: 1,
2259                        dev_offset: 1, // Intentionally use the same as `reqid`.
2260                        vmoid: vmo_id.id,
2261                        length: 1,
2262                        ..Default::default()
2263                    })
2264                    .await
2265                    .unwrap();
2266
2267                writer
2268                    .write_entries(&BlockFifoRequest {
2269                        command: BlockFifoCommand {
2270                            opcode: BlockOpcode::Read.into_primitive(),
2271                            ..Default::default()
2272                        },
2273                        reqid: 2,
2274                        dev_offset: 2,
2275                        vmoid: vmo_id.id,
2276                        length: 1,
2277                        ..Default::default()
2278                    })
2279                    .await
2280                    .unwrap();
2281
2282                // Wait till both those entries are pending.
2283                poll_fn(|cx: &mut Context<'_>| {
2284                    if waiting_readers.lock().len() == 2 {
2285                        Poll::Ready(())
2286                    } else {
2287                        // Yield to the executor.
2288                        cx.waker().wake_by_ref();
2289                        Poll::Pending
2290                    }
2291                })
2292                .await;
2293
2294                let mut response = BlockFifoResponse::default();
2295                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2296
2297                let (id, tx) = waiting_readers.lock().pop().unwrap();
2298                tx.send(()).unwrap();
2299
2300                reader.read_entries(&mut response).await.unwrap();
2301                assert_eq!(response.status, zx::sys::ZX_OK);
2302                assert_eq!(response.reqid, id);
2303
2304                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2305
2306                let (id, tx) = waiting_readers.lock().pop().unwrap();
2307                tx.send(()).unwrap();
2308
2309                reader.read_entries(&mut response).await.unwrap();
2310                assert_eq!(response.status, zx::sys::ZX_OK);
2311                assert_eq!(response.reqid, id);
2312            }
2313        );
2314    }
2315
2316    #[fuchsia::test]
2317    async fn test_groups() {
2318        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2319
2320        futures::join!(
2321            async move {
2322                let block_server = BlockServer::new(
2323                    BLOCK_SIZE,
2324                    Arc::new(MockInterface {
2325                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2326                        ..MockInterface::default()
2327                    }),
2328                );
2329                block_server.handle_requests(stream).await.unwrap();
2330            },
2331            async move {
2332                let (session_proxy, server) = fidl::endpoints::create_proxy();
2333
2334                proxy.open_session(server).unwrap();
2335
2336                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2337                let vmo_id = session_proxy
2338                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2339                    .await
2340                    .unwrap()
2341                    .unwrap();
2342
2343                let mut fifo =
2344                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2345                let (mut reader, mut writer) = fifo.async_io();
2346
2347                writer
2348                    .write_entries(&BlockFifoRequest {
2349                        command: BlockFifoCommand {
2350                            opcode: BlockOpcode::Read.into_primitive(),
2351                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2352                            ..Default::default()
2353                        },
2354                        group: 1,
2355                        vmoid: vmo_id.id,
2356                        length: 1,
2357                        ..Default::default()
2358                    })
2359                    .await
2360                    .unwrap();
2361
2362                writer
2363                    .write_entries(&BlockFifoRequest {
2364                        command: BlockFifoCommand {
2365                            opcode: BlockOpcode::Read.into_primitive(),
2366                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2367                            ..Default::default()
2368                        },
2369                        reqid: 2,
2370                        group: 1,
2371                        vmoid: vmo_id.id,
2372                        length: 1,
2373                        ..Default::default()
2374                    })
2375                    .await
2376                    .unwrap();
2377
2378                let mut response = BlockFifoResponse::default();
2379                reader.read_entries(&mut response).await.unwrap();
2380                assert_eq!(response.status, zx::sys::ZX_OK);
2381                assert_eq!(response.reqid, 2);
2382                assert_eq!(response.group, 1);
2383            }
2384        );
2385    }
2386
2387    #[fuchsia::test]
2388    async fn test_group_error() {
2389        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2390
2391        let counter = Arc::new(AtomicU64::new(0));
2392        let counter_clone = counter.clone();
2393
2394        futures::join!(
2395            async move {
2396                let block_server = BlockServer::new(
2397                    BLOCK_SIZE,
2398                    Arc::new(MockInterface {
2399                        read_hook: Some(Box::new(move |_, _, _, _| {
2400                            counter_clone.fetch_add(1, Ordering::Relaxed);
2401                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2402                        })),
2403                        ..MockInterface::default()
2404                    }),
2405                );
2406                block_server.handle_requests(stream).await.unwrap();
2407            },
2408            async move {
2409                let (session_proxy, server) = fidl::endpoints::create_proxy();
2410
2411                proxy.open_session(server).unwrap();
2412
2413                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2414                let vmo_id = session_proxy
2415                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2416                    .await
2417                    .unwrap()
2418                    .unwrap();
2419
2420                let mut fifo =
2421                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2422                let (mut reader, mut writer) = fifo.async_io();
2423
2424                writer
2425                    .write_entries(&BlockFifoRequest {
2426                        command: BlockFifoCommand {
2427                            opcode: BlockOpcode::Read.into_primitive(),
2428                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2429                            ..Default::default()
2430                        },
2431                        group: 1,
2432                        vmoid: vmo_id.id,
2433                        length: 1,
2434                        ..Default::default()
2435                    })
2436                    .await
2437                    .unwrap();
2438
2439                // Wait until processed.
2440                poll_fn(|cx: &mut Context<'_>| {
2441                    if counter.load(Ordering::Relaxed) == 1 {
2442                        Poll::Ready(())
2443                    } else {
2444                        // Yield to the executor.
2445                        cx.waker().wake_by_ref();
2446                        Poll::Pending
2447                    }
2448                })
2449                .await;
2450
2451                let mut response = BlockFifoResponse::default();
2452                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2453
2454                writer
2455                    .write_entries(&BlockFifoRequest {
2456                        command: BlockFifoCommand {
2457                            opcode: BlockOpcode::Read.into_primitive(),
2458                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2459                            ..Default::default()
2460                        },
2461                        group: 1,
2462                        vmoid: vmo_id.id,
2463                        length: 1,
2464                        ..Default::default()
2465                    })
2466                    .await
2467                    .unwrap();
2468
2469                writer
2470                    .write_entries(&BlockFifoRequest {
2471                        command: BlockFifoCommand {
2472                            opcode: BlockOpcode::Read.into_primitive(),
2473                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2474                            ..Default::default()
2475                        },
2476                        reqid: 2,
2477                        group: 1,
2478                        vmoid: vmo_id.id,
2479                        length: 1,
2480                        ..Default::default()
2481                    })
2482                    .await
2483                    .unwrap();
2484
2485                reader.read_entries(&mut response).await.unwrap();
2486                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2487                assert_eq!(response.reqid, 2);
2488                assert_eq!(response.group, 1);
2489
2490                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2491
2492                // Only the first request should have been processed.
2493                assert_eq!(counter.load(Ordering::Relaxed), 1);
2494            }
2495        );
2496    }
2497
2498    #[fuchsia::test]
2499    async fn test_group_with_two_lasts() {
2500        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2501
2502        let (tx, rx) = oneshot::channel();
2503
2504        futures::join!(
2505            async move {
2506                let rx = Mutex::new(Some(rx));
2507                let block_server = BlockServer::new(
2508                    BLOCK_SIZE,
2509                    Arc::new(MockInterface {
2510                        read_hook: Some(Box::new(move |_, _, _, _| {
2511                            let rx = rx.lock().take().unwrap();
2512                            Box::pin(async {
2513                                let _ = rx.await;
2514                                Ok(())
2515                            })
2516                        })),
2517                        ..MockInterface::default()
2518                    }),
2519                );
2520                block_server.handle_requests(stream).await.unwrap();
2521            },
2522            async move {
2523                let (session_proxy, server) = fidl::endpoints::create_proxy();
2524
2525                proxy.open_session(server).unwrap();
2526
2527                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2528                let vmo_id = session_proxy
2529                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2530                    .await
2531                    .unwrap()
2532                    .unwrap();
2533
2534                let mut fifo =
2535                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2536                let (mut reader, mut writer) = fifo.async_io();
2537
2538                writer
2539                    .write_entries(&BlockFifoRequest {
2540                        command: BlockFifoCommand {
2541                            opcode: BlockOpcode::Read.into_primitive(),
2542                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2543                            ..Default::default()
2544                        },
2545                        reqid: 1,
2546                        group: 1,
2547                        vmoid: vmo_id.id,
2548                        length: 1,
2549                        ..Default::default()
2550                    })
2551                    .await
2552                    .unwrap();
2553
2554                writer
2555                    .write_entries(&BlockFifoRequest {
2556                        command: BlockFifoCommand {
2557                            opcode: BlockOpcode::Read.into_primitive(),
2558                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2559                            ..Default::default()
2560                        },
2561                        reqid: 2,
2562                        group: 1,
2563                        vmoid: vmo_id.id,
2564                        length: 1,
2565                        ..Default::default()
2566                    })
2567                    .await
2568                    .unwrap();
2569
2570                // Send an independent request to flush through the fifo.
2571                writer
2572                    .write_entries(&BlockFifoRequest {
2573                        command: BlockFifoCommand {
2574                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2575                            ..Default::default()
2576                        },
2577                        reqid: 3,
2578                        vmoid: vmo_id.id,
2579                        ..Default::default()
2580                    })
2581                    .await
2582                    .unwrap();
2583
2584                // It should succeed.
2585                let mut response = BlockFifoResponse::default();
2586                reader.read_entries(&mut response).await.unwrap();
2587                assert_eq!(response.status, zx::sys::ZX_OK);
2588                assert_eq!(response.reqid, 3);
2589
2590                // Now release the original request.
2591                tx.send(()).unwrap();
2592
2593                // The response should be for the first message tagged as last, and it should be
2594                // an error because we sent two messages with the LAST marker.
2595                let mut response = BlockFifoResponse::default();
2596                reader.read_entries(&mut response).await.unwrap();
2597                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2598                assert_eq!(response.reqid, 1);
2599                assert_eq!(response.group, 1);
2600            }
2601        );
2602    }
2603
2604    #[fuchsia::test(allow_stalls = false)]
2605    async fn test_requests_dont_block_sessions() {
2606        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2607
2608        let (tx, rx) = oneshot::channel();
2609
2610        fasync::Task::local(async move {
2611            let rx = Mutex::new(Some(rx));
2612            let block_server = BlockServer::new(
2613                BLOCK_SIZE,
2614                Arc::new(MockInterface {
2615                    read_hook: Some(Box::new(move |_, _, _, _| {
2616                        let rx = rx.lock().take().unwrap();
2617                        Box::pin(async {
2618                            let _ = rx.await;
2619                            Ok(())
2620                        })
2621                    })),
2622                    ..MockInterface::default()
2623                }),
2624            );
2625            block_server.handle_requests(stream).await.unwrap();
2626        })
2627        .detach();
2628
2629        let mut fut = pin!(async {
2630            let (session_proxy, server) = fidl::endpoints::create_proxy();
2631
2632            proxy.open_session(server).unwrap();
2633
2634            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2635            let vmo_id = session_proxy
2636                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2637                .await
2638                .unwrap()
2639                .unwrap();
2640
2641            let mut fifo =
2642                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2643            let (mut reader, mut writer) = fifo.async_io();
2644
2645            writer
2646                .write_entries(&BlockFifoRequest {
2647                    command: BlockFifoCommand {
2648                        opcode: BlockOpcode::Read.into_primitive(),
2649                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2650                        ..Default::default()
2651                    },
2652                    reqid: 1,
2653                    group: 1,
2654                    vmoid: vmo_id.id,
2655                    length: 1,
2656                    ..Default::default()
2657                })
2658                .await
2659                .unwrap();
2660
2661            let mut response = BlockFifoResponse::default();
2662            reader.read_entries(&mut response).await.unwrap();
2663            assert_eq!(response.status, zx::sys::ZX_OK);
2664        });
2665
2666        // The response won't come back until we send on `tx`.
2667        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2668
2669        let mut fut2 = pin!(proxy.get_volume_info());
2670
2671        // get_volume_info is set up to stall forever.
2672        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2673
2674        // If we now free up the first future, it should resolve; the stalled call to
2675        // get_volume_info should not block the fifo response.
2676        let _ = tx.send(());
2677
2678        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2679    }
2680
2681    #[fuchsia::test]
2682    async fn test_request_flow_control() {
2683        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2684
2685        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2686        // server will block until that happens.
2687        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2688        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2689        let event_clone = event.clone();
2690        futures::join!(
2691            async move {
2692                let block_server = BlockServer::new(
2693                    BLOCK_SIZE,
2694                    Arc::new(MockInterface {
2695                        read_hook: Some(Box::new(move |_, _, _, _| {
2696                            let event_clone = event_clone.clone();
2697                            Box::pin(async move {
2698                                if !event_clone.1.load(Ordering::SeqCst) {
2699                                    event_clone.0.listen().await;
2700                                }
2701                                Ok(())
2702                            })
2703                        })),
2704                        ..MockInterface::default()
2705                    }),
2706                );
2707                block_server.handle_requests(stream).await.unwrap();
2708            },
2709            async move {
2710                let (session_proxy, server) = fidl::endpoints::create_proxy();
2711
2712                proxy.open_session(server).unwrap();
2713
2714                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2715                let vmo_id = session_proxy
2716                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2717                    .await
2718                    .unwrap()
2719                    .unwrap();
2720
2721                let mut fifo =
2722                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2723                let (mut reader, mut writer) = fifo.async_io();
2724
2725                for i in 0..MAX_REQUESTS {
2726                    writer
2727                        .write_entries(&BlockFifoRequest {
2728                            command: BlockFifoCommand {
2729                                opcode: BlockOpcode::Read.into_primitive(),
2730                                ..Default::default()
2731                            },
2732                            reqid: (i + 1) as u32,
2733                            dev_offset: i,
2734                            vmoid: vmo_id.id,
2735                            length: 1,
2736                            ..Default::default()
2737                        })
2738                        .await
2739                        .unwrap();
2740                }
2741                assert!(
2742                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2743                        command: BlockFifoCommand {
2744                            opcode: BlockOpcode::Read.into_primitive(),
2745                            ..Default::default()
2746                        },
2747                        reqid: u32::MAX,
2748                        dev_offset: MAX_REQUESTS,
2749                        vmoid: vmo_id.id,
2750                        length: 1,
2751                        ..Default::default()
2752                    })))
2753                    .is_pending()
2754                );
2755                // OK, let the server start to process.
2756                event.1.store(true, Ordering::SeqCst);
2757                event.0.notify(usize::MAX);
2758                // For each entry we read, make sure we can write a new one in.
2759                let mut finished_reqids = vec![];
2760                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2761                    let mut response = BlockFifoResponse::default();
2762                    reader.read_entries(&mut response).await.unwrap();
2763                    assert_eq!(response.status, zx::sys::ZX_OK);
2764                    finished_reqids.push(response.reqid);
2765                    writer
2766                        .write_entries(&BlockFifoRequest {
2767                            command: BlockFifoCommand {
2768                                opcode: BlockOpcode::Read.into_primitive(),
2769                                ..Default::default()
2770                            },
2771                            reqid: (i + 1) as u32,
2772                            dev_offset: i,
2773                            vmoid: vmo_id.id,
2774                            length: 1,
2775                            ..Default::default()
2776                        })
2777                        .await
2778                        .unwrap();
2779                }
2780                let mut response = BlockFifoResponse::default();
2781                for _ in 0..MAX_REQUESTS {
2782                    reader.read_entries(&mut response).await.unwrap();
2783                    assert_eq!(response.status, zx::sys::ZX_OK);
2784                    finished_reqids.push(response.reqid);
2785                }
2786                // Verify that we got a response for each request.  Note that we can't assume FIFO
2787                // ordering.
2788                finished_reqids.sort();
2789                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2790                let mut i = 1;
2791                for reqid in finished_reqids {
2792                    assert_eq!(reqid, i);
2793                    i += 1;
2794                }
2795            }
2796        );
2797    }
2798
2799    #[fuchsia::test]
2800    async fn test_passthrough_io_with_fixed_map() {
2801        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
2802
2803        let expected_op = Arc::new(Mutex::new(None));
2804        let expected_op_clone = expected_op.clone();
2805        futures::join!(
2806            async {
2807                let block_server = BlockServer::new(
2808                    BLOCK_SIZE,
2809                    Arc::new(IoMockInterface {
2810                        return_errors: false,
2811                        do_checks: true,
2812                        expected_op: expected_op_clone,
2813                    }),
2814                );
2815                block_server.handle_requests(stream).await.unwrap();
2816            },
2817            async move {
2818                let (session_proxy, server) = fidl::endpoints::create_proxy();
2819
2820                let mapping = fblock::BlockOffsetMapping {
2821                    source_block_offset: 0,
2822                    target_block_offset: 10,
2823                    length: 20,
2824                };
2825                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2826
2827                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2828                let vmo_id = session_proxy
2829                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2830                    .await
2831                    .unwrap()
2832                    .unwrap();
2833
2834                let mut fifo =
2835                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2836                let (mut reader, mut writer) = fifo.async_io();
2837
2838                // READ
2839                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2840                writer
2841                    .write_entries(&BlockFifoRequest {
2842                        command: BlockFifoCommand {
2843                            opcode: BlockOpcode::Read.into_primitive(),
2844                            ..Default::default()
2845                        },
2846                        vmoid: vmo_id.id,
2847                        dev_offset: 1,
2848                        length: 2,
2849                        vmo_offset: 3,
2850                        ..Default::default()
2851                    })
2852                    .await
2853                    .unwrap();
2854
2855                let mut response = BlockFifoResponse::default();
2856                reader.read_entries(&mut response).await.unwrap();
2857                assert_eq!(response.status, zx::sys::ZX_OK);
2858
2859                // WRITE
2860                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2861                writer
2862                    .write_entries(&BlockFifoRequest {
2863                        command: BlockFifoCommand {
2864                            opcode: BlockOpcode::Write.into_primitive(),
2865                            ..Default::default()
2866                        },
2867                        vmoid: vmo_id.id,
2868                        dev_offset: 4,
2869                        length: 5,
2870                        vmo_offset: 6,
2871                        ..Default::default()
2872                    })
2873                    .await
2874                    .unwrap();
2875
2876                reader.read_entries(&mut response).await.unwrap();
2877                assert_eq!(response.status, zx::sys::ZX_OK);
2878
2879                // FLUSH
2880                *expected_op.lock() = Some(ExpectedOp::Flush);
2881                writer
2882                    .write_entries(&BlockFifoRequest {
2883                        command: BlockFifoCommand {
2884                            opcode: BlockOpcode::Flush.into_primitive(),
2885                            ..Default::default()
2886                        },
2887                        ..Default::default()
2888                    })
2889                    .await
2890                    .unwrap();
2891
2892                reader.read_entries(&mut response).await.unwrap();
2893                assert_eq!(response.status, zx::sys::ZX_OK);
2894
2895                // TRIM
2896                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2897                writer
2898                    .write_entries(&BlockFifoRequest {
2899                        command: BlockFifoCommand {
2900                            opcode: BlockOpcode::Trim.into_primitive(),
2901                            ..Default::default()
2902                        },
2903                        dev_offset: 7,
2904                        length: 3,
2905                        ..Default::default()
2906                    })
2907                    .await
2908                    .unwrap();
2909
2910                reader.read_entries(&mut response).await.unwrap();
2911                assert_eq!(response.status, zx::sys::ZX_OK);
2912
2913                // READ past window
2914                *expected_op.lock() = None;
2915                writer
2916                    .write_entries(&BlockFifoRequest {
2917                        command: BlockFifoCommand {
2918                            opcode: BlockOpcode::Read.into_primitive(),
2919                            ..Default::default()
2920                        },
2921                        vmoid: vmo_id.id,
2922                        dev_offset: 19,
2923                        length: 2,
2924                        vmo_offset: 3,
2925                        ..Default::default()
2926                    })
2927                    .await
2928                    .unwrap();
2929
2930                reader.read_entries(&mut response).await.unwrap();
2931                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2932
2933                std::mem::drop(proxy);
2934            }
2935        );
2936    }
2937
2938    #[fuchsia::test]
2939    fn operation_map() {
2940        const BLOCK_SIZE: u32 = 512;
2941
2942        #[track_caller]
2943        fn expect_map_result(
2944            mut operation: Operation,
2945            mapping: Option<BlockOffsetMapping>,
2946            max_blocks: Option<NonZero<u32>>,
2947            expected_operations: Vec<Operation>,
2948        ) {
2949            let mut ops = vec![];
2950            while let Some(remainder) =
2951                operation.map(mapping.as_ref(), max_blocks.clone(), BLOCK_SIZE)
2952            {
2953                ops.push(operation);
2954                operation = remainder;
2955            }
2956            ops.push(operation);
2957            assert_eq!(ops, expected_operations);
2958        }
2959
2960        // No limits
2961        expect_map_result(
2962            Operation::Read {
2963                device_block_offset: 10,
2964                block_count: 200,
2965                _unused: 0,
2966                vmo_offset: 0,
2967                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2968            },
2969            None,
2970            None,
2971            vec![Operation::Read {
2972                device_block_offset: 10,
2973                block_count: 200,
2974                _unused: 0,
2975                vmo_offset: 0,
2976                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2977            }],
2978        );
2979
2980        // Max block count
2981        expect_map_result(
2982            Operation::Read {
2983                device_block_offset: 10,
2984                block_count: 200,
2985                _unused: 0,
2986                vmo_offset: 0,
2987                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2988            },
2989            None,
2990            NonZero::new(120),
2991            vec![
2992                Operation::Read {
2993                    device_block_offset: 10,
2994                    block_count: 120,
2995                    _unused: 0,
2996                    vmo_offset: 0,
2997                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
2998                },
2999                Operation::Read {
3000                    device_block_offset: 130,
3001                    block_count: 80,
3002                    _unused: 0,
3003                    vmo_offset: 120 * BLOCK_SIZE as u64,
3004                    options: ReadOptions {
3005                        // The DUN should be offset by the number of blocks in the first request.
3006                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3007                    },
3008                },
3009            ],
3010        );
3011        expect_map_result(
3012            Operation::Trim { device_block_offset: 10, block_count: 200 },
3013            None,
3014            NonZero::new(120),
3015            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
3016        );
3017
3018        // Remapping + Max block count
3019        expect_map_result(
3020            Operation::Read {
3021                device_block_offset: 10,
3022                block_count: 200,
3023                _unused: 0,
3024                vmo_offset: 0,
3025                options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3026            },
3027            Some(BlockOffsetMapping {
3028                source_block_offset: 10,
3029                target_block_offset: 100,
3030                length: 200,
3031            }),
3032            NonZero::new(120),
3033            vec![
3034                Operation::Read {
3035                    device_block_offset: 100,
3036                    block_count: 120,
3037                    _unused: 0,
3038                    vmo_offset: 0,
3039                    options: ReadOptions { inline_crypto: InlineCryptoOptions::enabled(1, 1000) },
3040                },
3041                Operation::Read {
3042                    device_block_offset: 220,
3043                    block_count: 80,
3044                    _unused: 0,
3045                    vmo_offset: 120 * BLOCK_SIZE as u64,
3046                    options: ReadOptions {
3047                        inline_crypto: InlineCryptoOptions::enabled(1, 1000 + 120),
3048                    },
3049                },
3050            ],
3051        );
3052        expect_map_result(
3053            Operation::Trim { device_block_offset: 10, block_count: 200 },
3054            Some(BlockOffsetMapping {
3055                source_block_offset: 10,
3056                target_block_offset: 100,
3057                length: 200,
3058            }),
3059            NonZero::new(120),
3060            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
3061        );
3062    }
3063
3064    // Verifies that if the pre-flush (for a simulated barrier) fails, the write is not executed.
3065    #[fuchsia::test]
3066    async fn test_pre_barrier_flush_failure() {
3067        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3068
3069        struct NoBarrierInterface;
3070        impl super::async_interface::Interface for NoBarrierInterface {
3071            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3072                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3073                    device_flags: fblock::DeviceFlag::empty(), // No BARRIER_SUPPORT
3074                    max_transfer_blocks: NonZero::new(100),
3075                    block_range: Some(0..100),
3076                    type_guid: [0; 16],
3077                    instance_guid: [0; 16],
3078                    name: "test".to_string(),
3079                    flags: 0,
3080                }))
3081            }
3082            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3083                Ok(())
3084            }
3085            async fn read(
3086                &self,
3087                _: u64,
3088                _: u32,
3089                _: &Arc<zx::Vmo>,
3090                _: u64,
3091                _: ReadOptions,
3092                _: TraceFlowId,
3093            ) -> Result<(), zx::Status> {
3094                unreachable!()
3095            }
3096            async fn write(
3097                &self,
3098                _: u64,
3099                _: u32,
3100                _: &Arc<zx::Vmo>,
3101                _: u64,
3102                _: WriteOptions,
3103                _: TraceFlowId,
3104            ) -> Result<(), zx::Status> {
3105                panic!("Write should not be called");
3106            }
3107            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3108                Err(zx::Status::IO)
3109            }
3110            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3111                unreachable!()
3112            }
3113        }
3114
3115        futures::join!(
3116            async move {
3117                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3118                block_server.handle_requests(stream).await.unwrap();
3119            },
3120            async move {
3121                let (session_proxy, server) = fidl::endpoints::create_proxy();
3122                proxy.open_session(server).unwrap();
3123                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3124                let vmo_id = session_proxy
3125                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3126                    .await
3127                    .unwrap()
3128                    .unwrap();
3129
3130                let mut fifo =
3131                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3132                let (mut reader, mut writer) = fifo.async_io();
3133
3134                writer
3135                    .write_entries(&BlockFifoRequest {
3136                        command: BlockFifoCommand {
3137                            opcode: BlockOpcode::Write.into_primitive(),
3138                            flags: BlockIoFlag::PRE_BARRIER.bits(),
3139                            ..Default::default()
3140                        },
3141                        vmoid: vmo_id.id,
3142                        length: 1,
3143                        ..Default::default()
3144                    })
3145                    .await
3146                    .unwrap();
3147
3148                let mut response = BlockFifoResponse::default();
3149                reader.read_entries(&mut response).await.unwrap();
3150                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3151            }
3152        );
3153    }
3154
3155    // Verifies that if the write fails when a post-flush is required (for a simulated FUA), the
3156    // post-flush is not executed.
3157    #[fuchsia::test]
3158    async fn test_post_barrier_write_failure() {
3159        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fblock::BlockMarker>();
3160
3161        struct NoBarrierInterface;
3162        impl super::async_interface::Interface for NoBarrierInterface {
3163            fn get_info(&self) -> Cow<'_, DeviceInfo> {
3164                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
3165                    device_flags: fblock::DeviceFlag::empty(), // No FUA_SUPPORT
3166                    max_transfer_blocks: NonZero::new(100),
3167                    block_range: Some(0..100),
3168                    type_guid: [0; 16],
3169                    instance_guid: [0; 16],
3170                    name: "test".to_string(),
3171                    flags: 0,
3172                }))
3173            }
3174            async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
3175                Ok(())
3176            }
3177            async fn read(
3178                &self,
3179                _: u64,
3180                _: u32,
3181                _: &Arc<zx::Vmo>,
3182                _: u64,
3183                _: ReadOptions,
3184                _: TraceFlowId,
3185            ) -> Result<(), zx::Status> {
3186                unreachable!()
3187            }
3188            async fn write(
3189                &self,
3190                _: u64,
3191                _: u32,
3192                _: &Arc<zx::Vmo>,
3193                _: u64,
3194                _: WriteOptions,
3195                _: TraceFlowId,
3196            ) -> Result<(), zx::Status> {
3197                Err(zx::Status::IO)
3198            }
3199            async fn flush(&self, _: TraceFlowId) -> Result<(), zx::Status> {
3200                panic!("Flush should not be called")
3201            }
3202            async fn trim(&self, _: u64, _: u32, _: TraceFlowId) -> Result<(), zx::Status> {
3203                unreachable!()
3204            }
3205        }
3206
3207        futures::join!(
3208            async move {
3209                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(NoBarrierInterface));
3210                block_server.handle_requests(stream).await.unwrap();
3211            },
3212            async move {
3213                let (session_proxy, server) = fidl::endpoints::create_proxy();
3214                proxy.open_session(server).unwrap();
3215                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
3216                let vmo_id = session_proxy
3217                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
3218                    .await
3219                    .unwrap()
3220                    .unwrap();
3221
3222                let mut fifo =
3223                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
3224                let (mut reader, mut writer) = fifo.async_io();
3225
3226                writer
3227                    .write_entries(&BlockFifoRequest {
3228                        command: BlockFifoCommand {
3229                            opcode: BlockOpcode::Write.into_primitive(),
3230                            flags: BlockIoFlag::FORCE_ACCESS.bits(),
3231                            ..Default::default()
3232                        },
3233                        vmoid: vmo_id.id,
3234                        length: 1,
3235                        ..Default::default()
3236                    })
3237                    .await
3238                    .unwrap();
3239
3240                let mut response = BlockFifoResponse::default();
3241                reader.read_entries(&mut response).await.unwrap();
3242                assert_eq!(response.status, zx::sys::ZX_ERR_IO);
3243            }
3244        );
3245    }
3246}