_block_server_c_rustc_static/
lib.rs

1// Copyright 2025 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use crate::bin::Bin;
5use anyhow::{Error, anyhow};
6use block_protocol::{BlockFifoRequest, BlockFifoResponse};
7use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
8use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
9use fuchsia_sync::Mutex;
10use futures::{Future, FutureExt as _, TryStreamExt as _};
11use slab::Slab;
12use std::borrow::Cow;
13use std::collections::BTreeMap;
14use std::num::NonZero;
15use std::ops::Range;
16use std::sync::Arc;
17use std::sync::atomic::AtomicU64;
18use zx::HandleBased;
19use {
20    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
21    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
22};
23
24pub mod async_interface;
25mod bin;
26pub mod c_interface;
27
28pub(crate) const FIFO_MAX_REQUESTS: usize = 64;
29
30type TraceFlowId = Option<NonZero<u64>>;
31
32#[derive(Clone)]
33pub enum DeviceInfo {
34    Block(BlockInfo),
35    Partition(PartitionInfo),
36}
37
38impl DeviceInfo {
39    pub fn block_count(&self) -> Option<u64> {
40        match self {
41            Self::Block(BlockInfo { block_count, .. }) => Some(*block_count),
42            Self::Partition(PartitionInfo { block_range, .. }) => {
43                block_range.as_ref().map(|range| range.end - range.start)
44            }
45        }
46    }
47
48    pub fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
49        match self {
50            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks.clone(),
51            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => {
52                max_transfer_blocks.clone()
53            }
54        }
55    }
56
57    fn max_transfer_size(&self, block_size: u32) -> u32 {
58        if let Some(max_blocks) = self.max_transfer_blocks() {
59            max_blocks.get() * block_size
60        } else {
61            MAX_TRANSFER_UNBOUNDED
62        }
63    }
64}
65
66/// Information associated with non-partition block devices.
67#[derive(Clone, Default)]
68pub struct BlockInfo {
69    pub device_flags: fblock::Flag,
70    pub block_count: u64,
71    pub max_transfer_blocks: Option<NonZero<u32>>,
72}
73
74/// Information associated with a block device that is also a partition.
75#[derive(Clone, Default)]
76pub struct PartitionInfo {
77    /// The device flags reported by the underlying device.
78    pub device_flags: fblock::Flag,
79    pub max_transfer_blocks: Option<NonZero<u32>>,
80    /// If `block_range` is None, the partition is a volume and may not be contiguous.
81    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
82    /// slices and use that (along with the slice and block sizes) to determine the block count.
83    pub block_range: Option<Range<u64>>,
84    pub type_guid: [u8; 16],
85    pub instance_guid: [u8; 16],
86    pub name: String,
87    pub flags: u64,
88}
89
90/// We internally keep track of active requests, so that when the server is torn down, we can
91/// deallocate all of the resources for pending requests.
92struct ActiveRequest<S> {
93    session: S,
94    group_or_request: GroupOrRequest,
95    trace_flow_id: TraceFlowId,
96    vmo_bin_key: Option<usize>,
97    status: zx::Status,
98    count: u32,
99    req_id: Option<u32>,
100}
101
102pub struct ActiveRequests<S>(Mutex<ActiveRequestsInner<S>>);
103
104impl<S> Default for ActiveRequests<S> {
105    fn default() -> Self {
106        Self(Mutex::new(ActiveRequestsInner { requests: Slab::default(), vmo_bin: Bin::new() }))
107    }
108}
109
110impl<S> ActiveRequests<S> {
111    fn complete_and_take_response(
112        &self,
113        request_id: RequestId,
114        status: zx::Status,
115    ) -> Option<(S, BlockFifoResponse)> {
116        self.0.lock().complete_and_take_response(request_id, status)
117    }
118}
119
120struct ActiveRequestsInner<S> {
121    requests: Slab<ActiveRequest<S>>,
122    vmo_bin: Bin<Arc<zx::Vmo>>,
123}
124
125// Keeps track of all the requests that are currently being processed
126impl<S> ActiveRequestsInner<S> {
127    /// Completes a request.
128    fn complete(&mut self, request_id: RequestId, status: zx::Status) {
129        let group = &mut self.requests[request_id.0];
130
131        group.count = group.count.checked_sub(1).unwrap();
132        if status != zx::Status::OK && group.status == zx::Status::OK {
133            group.status = status
134        }
135
136        fuchsia_trace::duration!(
137            c"storage",
138            c"block_server::finish_transaction",
139            "request_id" => request_id.0,
140            "group_completed" => group.count == 0,
141            "status" => status.into_raw());
142        if let Some(trace_flow_id) = group.trace_flow_id {
143            fuchsia_trace::flow_step!(
144                c"storage",
145                c"block_server::finish_request",
146                trace_flow_id.get().into()
147            );
148        }
149    }
150
151    /// Takes the response if all requests are finished.
152    fn take_response(&mut self, request_id: RequestId) -> Option<(S, BlockFifoResponse)> {
153        let group = &self.requests[request_id.0];
154        match group.req_id {
155            Some(reqid) if group.count == 0 => {
156                let group = self.requests.remove(request_id.0);
157                if let Some(vmo_bin_key) = group.vmo_bin_key {
158                    self.vmo_bin.release(vmo_bin_key);
159                }
160                Some((
161                    group.session,
162                    BlockFifoResponse {
163                        status: group.status.into_raw(),
164                        reqid,
165                        group: group.group_or_request.group_id().unwrap_or(0),
166                        ..Default::default()
167                    },
168                ))
169            }
170            _ => None,
171        }
172    }
173
174    /// Competes the request and returns a response if the request group is finished.
175    fn complete_and_take_response(
176        &mut self,
177        request_id: RequestId,
178        status: zx::Status,
179    ) -> Option<(S, BlockFifoResponse)> {
180        self.complete(request_id, status);
181        self.take_response(request_id)
182    }
183}
184
185/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
186/// cbindgen:no-export
187pub struct BlockServer<SM> {
188    block_size: u32,
189    session_manager: Arc<SM>,
190}
191
192/// A single entry in `[OffsetMap]`.
193#[derive(Debug)]
194pub struct BlockOffsetMapping {
195    source_block_offset: u64,
196    target_block_offset: u64,
197    length: u64,
198}
199
200impl BlockOffsetMapping {
201    fn are_blocks_within_source_range(&self, blocks: (u64, u32)) -> bool {
202        blocks.0 >= self.source_block_offset
203            && blocks.0 + blocks.1 as u64 - self.source_block_offset <= self.length
204    }
205}
206
207impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
208    type Error = zx::Status;
209
210    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
211        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
212        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
213        Ok(Self {
214            source_block_offset: wire.source_block_offset,
215            target_block_offset: wire.target_block_offset,
216            length: wire.length,
217        })
218    }
219}
220
221/// Remaps the offset of block requests based on an internal map, and truncates long requests.
222pub struct OffsetMap {
223    mapping: Option<BlockOffsetMapping>,
224    max_transfer_blocks: Option<NonZero<u32>>,
225}
226
227impl OffsetMap {
228    /// An OffsetMap that remaps requests.
229    pub fn new(mapping: BlockOffsetMapping, max_transfer_blocks: Option<NonZero<u32>>) -> Self {
230        Self { mapping: Some(mapping), max_transfer_blocks }
231    }
232
233    /// An OffsetMap that just enforces maximum request sizes.
234    pub fn empty(max_transfer_blocks: Option<NonZero<u32>>) -> Self {
235        Self { mapping: None, max_transfer_blocks }
236    }
237
238    pub fn is_empty(&self) -> bool {
239        self.mapping.is_none()
240    }
241
242    fn mapping(&self) -> Option<&BlockOffsetMapping> {
243        self.mapping.as_ref()
244    }
245
246    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
247        self.max_transfer_blocks
248    }
249}
250
251// Methods take Arc<Self> rather than &self because of
252// https://github.com/rust-lang/rust/issues/42940.
253pub trait SessionManager: 'static {
254    type Session;
255
256    fn on_attach_vmo(
257        self: Arc<Self>,
258        vmo: &Arc<zx::Vmo>,
259    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
260
261    /// Creates a new session to handle `stream`.
262    /// The returned future should run until the session completes, for example when the client end
263    /// closes.
264    /// `offset_map`, will be used to adjust the block offset/length of FIFO requests.
265    fn open_session(
266        self: Arc<Self>,
267        stream: fblock::SessionRequestStream,
268        offset_map: OffsetMap,
269        block_size: u32,
270    ) -> impl Future<Output = Result<(), Error>> + Send;
271
272    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
273    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
274
275    /// Called to handle the GetVolumeInfo FIDL call.
276    fn get_volume_info(
277        &self,
278    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
279    {
280        async { Err(zx::Status::NOT_SUPPORTED) }
281    }
282
283    /// Called to handle the QuerySlices FIDL call.
284    fn query_slices(
285        &self,
286        _start_slices: &[u64],
287    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
288        async { Err(zx::Status::NOT_SUPPORTED) }
289    }
290
291    /// Called to handle the Shrink FIDL call.
292    fn extend(
293        &self,
294        _start_slice: u64,
295        _slice_count: u64,
296    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
297        async { Err(zx::Status::NOT_SUPPORTED) }
298    }
299
300    /// Called to handle the Shrink FIDL call.
301    fn shrink(
302        &self,
303        _start_slice: u64,
304        _slice_count: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306        async { Err(zx::Status::NOT_SUPPORTED) }
307    }
308
309    /// Returns the active requests.
310    fn active_requests(&self) -> &ActiveRequests<Self::Session>;
311}
312
313pub trait IntoSessionManager {
314    type SM: SessionManager;
315
316    fn into_session_manager(self) -> Arc<Self::SM>;
317}
318
319impl<SM: SessionManager> BlockServer<SM> {
320    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
321        Self { block_size, session_manager: session_manager.into_session_manager() }
322    }
323
324    pub fn session_manager(&self) -> &SM {
325        self.session_manager.as_ref()
326    }
327
328    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
329    pub async fn handle_requests(
330        &self,
331        mut requests: fvolume::VolumeRequestStream,
332    ) -> Result<(), Error> {
333        let scope = fasync::Scope::new();
334        loop {
335            match requests.try_next().await {
336                Ok(Some(request)) => {
337                    if let Some(session) = self.handle_request(request).await? {
338                        scope.spawn(session.map(|_| ()));
339                    }
340                }
341                Ok(None) => break,
342                Err(err) => log::warn!(err:?; "Invalid request"),
343            }
344        }
345        scope.await;
346        Ok(())
347    }
348
349    /// Processes a partition request.  If a new session task is created in response to the request,
350    /// it is returned.
351    async fn handle_request(
352        &self,
353        request: fvolume::VolumeRequest,
354    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
355        match request {
356            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
357                Ok(info) => {
358                    let max_transfer_size = info.max_transfer_size(self.block_size);
359                    let (block_count, flags) = match info.as_ref() {
360                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
361                            (*block_count, *device_flags)
362                        }
363                        DeviceInfo::Partition(partition_info) => {
364                            let block_count = if let Some(range) =
365                                partition_info.block_range.as_ref()
366                            {
367                                range.end - range.start
368                            } else {
369                                let volume_info = self.session_manager.get_volume_info().await?;
370                                volume_info.0.slice_size * volume_info.1.partition_slice_count
371                                    / self.block_size as u64
372                            };
373                            (block_count, partition_info.device_flags)
374                        }
375                    };
376                    responder.send(Ok(&fblock::BlockInfo {
377                        block_count,
378                        block_size: self.block_size,
379                        max_transfer_size,
380                        flags,
381                    }))?;
382                }
383                Err(status) => responder.send(Err(status.into_raw()))?,
384            },
385            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
386                match self.device_info().await {
387                    Ok(info) => {
388                        return Ok(Some(self.session_manager.clone().open_session(
389                            session.into_stream(),
390                            OffsetMap::empty(info.max_transfer_blocks()),
391                            self.block_size,
392                        )));
393                    }
394                    Err(status) => session.close_with_epitaph(status)?,
395                }
396            }
397            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
398                session,
399                mapping,
400                control_handle: _,
401            } => match self.device_info().await {
402                Ok(info) => {
403                    let initial_mapping: BlockOffsetMapping = match mapping.try_into() {
404                        Ok(m) => m,
405                        Err(status) => {
406                            session.close_with_epitaph(status)?;
407                            return Ok(None);
408                        }
409                    };
410                    if let Some(max) = info.block_count() {
411                        if initial_mapping.target_block_offset + initial_mapping.length > max {
412                            log::warn!(
413                                "Invalid mapping for session: {initial_mapping:?} (max {max})"
414                            );
415                            session.close_with_epitaph(zx::Status::INVALID_ARGS)?;
416                            return Ok(None);
417                        }
418                    }
419                    return Ok(Some(self.session_manager.clone().open_session(
420                        session.into_stream(),
421                        OffsetMap::new(initial_mapping, info.max_transfer_blocks()),
422                        self.block_size,
423                    )));
424                }
425                Err(status) => session.close_with_epitaph(status)?,
426            },
427            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
428                Ok(info) => {
429                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
430                        let mut guid =
431                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
432                        guid.value.copy_from_slice(&partition_info.type_guid);
433                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
434                    } else {
435                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
436                    }
437                }
438                Err(status) => {
439                    responder.send(status.into_raw(), None)?;
440                }
441            },
442            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
443                match self.device_info().await {
444                    Ok(info) => {
445                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
446                            let mut guid =
447                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
448                            guid.value.copy_from_slice(&partition_info.instance_guid);
449                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
450                        } else {
451                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
452                        }
453                    }
454                    Err(status) => {
455                        responder.send(status.into_raw(), None)?;
456                    }
457                }
458            }
459            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
460                Ok(info) => {
461                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
462                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
463                    } else {
464                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
465                    }
466                }
467                Err(status) => {
468                    responder.send(status.into_raw(), None)?;
469                }
470            },
471            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
472                Ok(info) => {
473                    if let DeviceInfo::Partition(info) = info.as_ref() {
474                        let mut type_guid =
475                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
476                        type_guid.value.copy_from_slice(&info.type_guid);
477                        let mut instance_guid =
478                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
479                        instance_guid.value.copy_from_slice(&info.instance_guid);
480                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
481                            name: Some(info.name.clone()),
482                            type_guid: Some(type_guid),
483                            instance_guid: Some(instance_guid),
484                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
485                            num_blocks: info
486                                .block_range
487                                .as_ref()
488                                .map(|range| range.end - range.start),
489                            flags: Some(info.flags),
490                            ..Default::default()
491                        }))?;
492                    }
493                }
494                Err(status) => responder.send(Err(status.into_raw()))?,
495            },
496            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
497                match self.session_manager.query_slices(&start_slices).await {
498                    Ok(mut results) => {
499                        let results_len = results.len();
500                        assert!(results_len <= 16);
501                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
502                        responder.send(
503                            zx::sys::ZX_OK,
504                            &results.try_into().unwrap(),
505                            results_len as u64,
506                        )?;
507                    }
508                    Err(s) => {
509                        responder.send(
510                            s.into_raw(),
511                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
512                            0,
513                        )?;
514                    }
515                }
516            }
517            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
518                match self.session_manager.get_volume_info().await {
519                    Ok((manager_info, volume_info)) => {
520                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
521                    }
522                    Err(s) => responder.send(s.into_raw(), None, None)?,
523                }
524            }
525            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
526                responder.send(
527                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
528                        .into_raw(),
529                )?;
530            }
531            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
532                responder.send(
533                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
534                        .into_raw(),
535                )?;
536            }
537            fvolume::VolumeRequest::Destroy { responder, .. } => {
538                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
539            }
540        }
541        Ok(None)
542    }
543
544    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
545        self.session_manager.get_info().await
546    }
547}
548
549struct SessionHelper<SM: SessionManager> {
550    session_manager: Arc<SM>,
551    offset_map: OffsetMap,
552    block_size: u32,
553    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
554    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
555}
556
557impl<SM: SessionManager> SessionHelper<SM> {
558    fn new(
559        session_manager: Arc<SM>,
560        offset_map: OffsetMap,
561        block_size: u32,
562    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
563        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
564        Ok((
565            Self { session_manager, offset_map, block_size, peer_fifo, vmos: Mutex::default() },
566            fifo,
567        ))
568    }
569
570    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
571        match request {
572            fblock::SessionRequest::GetFifo { responder } => {
573                let rights = zx::Rights::TRANSFER
574                    | zx::Rights::READ
575                    | zx::Rights::WRITE
576                    | zx::Rights::SIGNAL
577                    | zx::Rights::WAIT;
578                match self.peer_fifo.duplicate_handle(rights) {
579                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
580                    Err(s) => responder.send(Err(s.into_raw()))?,
581                }
582                Ok(())
583            }
584            fblock::SessionRequest::AttachVmo { vmo, responder } => {
585                let vmo = Arc::new(vmo);
586                let vmo_id = {
587                    let mut vmos = self.vmos.lock();
588                    if vmos.len() == u16::MAX as usize {
589                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
590                        return Ok(());
591                    } else {
592                        let vmo_id = match vmos.last_entry() {
593                            None => 1,
594                            Some(o) => {
595                                o.key().checked_add(1).unwrap_or_else(|| {
596                                    let mut vmo_id = 1;
597                                    // Find the first gap...
598                                    for (&id, _) in &*vmos {
599                                        if id > vmo_id {
600                                            break;
601                                        }
602                                        vmo_id = id + 1;
603                                    }
604                                    vmo_id
605                                })
606                            }
607                        };
608                        vmos.insert(vmo_id, vmo.clone());
609                        vmo_id
610                    }
611                };
612                self.session_manager.clone().on_attach_vmo(&vmo).await?;
613                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
614                Ok(())
615            }
616            fblock::SessionRequest::Close { responder } => {
617                responder.send(Ok(()))?;
618                Err(anyhow!("Closed"))
619            }
620        }
621    }
622
623    /// Decodes `request`.
624    fn decode_fifo_request(
625        &self,
626        session: SM::Session,
627        request: &BlockFifoRequest,
628    ) -> Result<DecodedRequest, Option<BlockFifoResponse>> {
629        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
630        let mut operation = BlockOpcode::from_primitive(request.command.opcode)
631            .ok_or(zx::Status::INVALID_ARGS)
632            .and_then(|code| {
633                if matches!(code, BlockOpcode::Read | BlockOpcode::Write | BlockOpcode::Trim) {
634                    if request.length == 0 {
635                        return Err(zx::Status::INVALID_ARGS);
636                    }
637                    // Make sure the end offsets won't wrap.
638                    if request.dev_offset.checked_add(request.length as u64).is_none()
639                        || (code != BlockOpcode::Trim
640                            && (request.length as u64 * self.block_size as u64)
641                                .checked_add(request.vmo_offset)
642                                .is_none())
643                    {
644                        return Err(zx::Status::OUT_OF_RANGE);
645                    }
646                }
647                Ok(match code {
648                    BlockOpcode::Read => Operation::Read {
649                        device_block_offset: request.dev_offset,
650                        block_count: request.length,
651                        _unused: 0,
652                        vmo_offset: request
653                            .vmo_offset
654                            .checked_mul(self.block_size as u64)
655                            .ok_or(zx::Status::OUT_OF_RANGE)?,
656                    },
657                    BlockOpcode::Write => {
658                        let mut options = WriteOptions::default();
659                        if flags.contains(BlockIoFlag::FORCE_ACCESS) {
660                            options.flags |= WriteFlags::FORCE_ACCESS;
661                        }
662                        if flags.contains(BlockIoFlag::PRE_BARRIER) {
663                            options.flags |= WriteFlags::PRE_BARRIER;
664                        }
665                        Operation::Write {
666                            device_block_offset: request.dev_offset,
667                            block_count: request.length,
668                            _unused: 0,
669                            options: options,
670                            vmo_offset: request
671                                .vmo_offset
672                                .checked_mul(self.block_size as u64)
673                                .ok_or(zx::Status::OUT_OF_RANGE)?,
674                        }
675                    }
676                    BlockOpcode::Flush => Operation::Flush,
677                    BlockOpcode::Trim => Operation::Trim {
678                        device_block_offset: request.dev_offset,
679                        block_count: request.length,
680                    },
681                    BlockOpcode::CloseVmo => Operation::CloseVmo,
682                })
683            });
684
685        let group_or_request = if flags.contains(BlockIoFlag::GROUP_ITEM) {
686            GroupOrRequest::Group(request.group)
687        } else {
688            GroupOrRequest::Request(request.reqid)
689        };
690
691        let mut active_requests = self.session_manager.active_requests().0.lock();
692        let mut request_id = None;
693
694        // Multiple Block I/O request may be sent as a group.
695        // Notes:
696        // - the group is identified by the group id in the request
697        // - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
698        //   flag is set.
699        // - when processing a request of a group fails, subsequent requests of that
700        //   group will not be processed.
701        //
702        // Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
703        if group_or_request.is_group() {
704            // Search for an existing entry that matches this group.  NOTE: This is a potentially
705            // expensive way to find a group (it's iterating over all slots in the active-requests
706            // slab).  This can be optimised easily should we need to.
707            for (key, group) in &mut active_requests.requests {
708                if group.group_or_request == group_or_request {
709                    if group.req_id.is_some() {
710                        // We have already received a request tagged as last.
711                        if group.status == zx::Status::OK {
712                            group.status = zx::Status::INVALID_ARGS;
713                        }
714                        // Ignore this request.
715                        return Err(None);
716                    }
717                    if flags.contains(BlockIoFlag::GROUP_LAST) {
718                        group.req_id = Some(request.reqid);
719                        // If the group has had an error, there is no point trying to issue this
720                        // request.
721                        if group.status != zx::Status::OK {
722                            operation = Err(group.status);
723                        }
724                    } else if group.status != zx::Status::OK {
725                        // The group has already encountered an error, so there is no point trying
726                        // to issue this request.
727                        return Err(None);
728                    }
729                    request_id = Some(RequestId(key));
730                    group.count += 1;
731                    break;
732                }
733            }
734        }
735
736        let mut retain_vmo = false;
737        let vmo = match &operation {
738            Ok(Operation::Read { .. } | Operation::Write { .. }) => {
739                self.vmos.lock().get(&request.vmoid).cloned().map_or(Err(zx::Status::IO), |vmo| {
740                    retain_vmo = true;
741                    Ok(Some(vmo))
742                })
743            }
744            Ok(Operation::CloseVmo) => {
745                self.vmos.lock().remove(&request.vmoid).map_or(Err(zx::Status::IO), |vmo| {
746                    active_requests.vmo_bin.add(vmo.clone());
747                    Ok(Some(vmo))
748                })
749            }
750            _ => Ok(None),
751        }
752        .unwrap_or_else(|e| {
753            operation = Err(e);
754            None
755        });
756
757        let trace_flow_id = NonZero::new(request.trace_flow_id);
758        let request_id = request_id.unwrap_or_else(|| {
759            let vmo_bin_key =
760                if retain_vmo { Some(active_requests.vmo_bin.retain()) } else { None };
761            RequestId(active_requests.requests.insert(ActiveRequest {
762                session,
763                group_or_request,
764                trace_flow_id,
765                vmo_bin_key,
766                status: zx::Status::OK,
767                count: 1,
768                req_id: if !flags.contains(BlockIoFlag::GROUP_ITEM)
769                    || flags.contains(BlockIoFlag::GROUP_LAST)
770                {
771                    Some(request.reqid)
772                } else {
773                    None
774                },
775            }))
776        });
777
778        Ok(DecodedRequest {
779            request_id,
780            trace_flow_id,
781            operation: operation.map_err(|status| {
782                active_requests.complete_and_take_response(request_id, status).map(|(_, r)| r)
783            })?,
784            vmo,
785        })
786    }
787
788    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
789        std::mem::take(&mut *self.vmos.lock())
790    }
791
792    /// Maps the request and returns the mapped request with an optional remainder.
793    fn map_request(
794        &self,
795        mut request: DecodedRequest,
796    ) -> Result<(DecodedRequest, Option<DecodedRequest>), Option<BlockFifoResponse>> {
797        let mut active_requests = self.session_manager.active_requests().0.lock();
798        let active_request = &mut active_requests.requests[request.request_id.0];
799        if active_request.status != zx::Status::OK {
800            return Err(active_requests
801                .complete_and_take_response(request.request_id, zx::Status::BAD_STATE)
802                .map(|(_, r)| r));
803        }
804        let mapping = self.offset_map.mapping();
805        match (mapping, request.operation.blocks()) {
806            (Some(mapping), Some(blocks)) if !mapping.are_blocks_within_source_range(blocks) => {
807                return Err(active_requests
808                    .complete_and_take_response(request.request_id, zx::Status::OUT_OF_RANGE)
809                    .map(|(_, r)| r));
810            }
811            _ => {}
812        }
813        let remainder = request.operation.map(
814            self.offset_map.mapping(),
815            self.offset_map.max_transfer_blocks(),
816            self.block_size,
817        );
818        if remainder.is_some() {
819            active_request.count += 1;
820        }
821        static CACHE: AtomicU64 = AtomicU64::new(0);
822        if let Some(context) =
823            fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
824        {
825            use fuchsia_trace::ArgValue;
826            let trace_args = [
827                ArgValue::of("request_id", request.request_id.0),
828                ArgValue::of("opcode", request.operation.trace_label()),
829            ];
830            let _scope = fuchsia_trace::duration(
831                c"storage",
832                c"block_server::start_transaction",
833                &trace_args,
834            );
835            if let Some(trace_flow_id) = active_request.trace_flow_id {
836                fuchsia_trace::flow_step(
837                    &context,
838                    c"block_server::start_transaction",
839                    trace_flow_id.get().into(),
840                    &[],
841                );
842            }
843        }
844        let remainder = remainder.map(|operation| DecodedRequest { operation, ..request.clone() });
845        Ok((request, remainder))
846    }
847
848    fn drop_active_requests(&self, pred: impl Fn(&SM::Session) -> bool) {
849        self.session_manager.active_requests().0.lock().requests.retain(|_, r| !pred(&r.session));
850    }
851}
852
853#[repr(transparent)]
854#[derive(Clone, Copy, Debug, Eq, PartialEq, Hash)]
855pub struct RequestId(usize);
856
857#[derive(Clone, Debug)]
858struct DecodedRequest {
859    request_id: RequestId,
860    trace_flow_id: TraceFlowId,
861    operation: Operation,
862    vmo: Option<Arc<zx::Vmo>>,
863}
864
865/// cbindgen:no-export
866pub type WriteFlags = block_protocol::WriteFlags;
867pub type WriteOptions = block_protocol::WriteOptions;
868
869#[repr(C)]
870#[derive(Clone, Debug, PartialEq, Eq)]
871pub enum Operation {
872    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
873    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
874    // code can read `device_block_offset` from the read variant and then assume it's valid for the
875    // write variant).
876    Read {
877        device_block_offset: u64,
878        block_count: u32,
879        _unused: u32,
880        vmo_offset: u64,
881    },
882    Write {
883        device_block_offset: u64,
884        block_count: u32,
885        _unused: u32,
886        vmo_offset: u64,
887        options: WriteOptions,
888    },
889    Flush,
890    Trim {
891        device_block_offset: u64,
892        block_count: u32,
893    },
894    /// This will never be seen by the C interface.
895    CloseVmo,
896}
897
898impl Operation {
899    fn trace_label(&self) -> &'static str {
900        match self {
901            Operation::Read { .. } => "read",
902            Operation::Write { .. } => "write",
903            Operation::Flush { .. } => "flush",
904            Operation::Trim { .. } => "trim",
905            Operation::CloseVmo { .. } => "close_vmo",
906        }
907    }
908
909    /// Returns (offset, length).
910    fn blocks(&self) -> Option<(u64, u32)> {
911        match self {
912            Operation::Read { device_block_offset, block_count, .. }
913            | Operation::Write { device_block_offset, block_count, .. }
914            | Operation::Trim { device_block_offset, block_count, .. } => {
915                Some((*device_block_offset, *block_count))
916            }
917            _ => None,
918        }
919    }
920
921    /// Returns mutable references to (offset, length).
922    fn blocks_mut(&mut self) -> Option<(&mut u64, &mut u32)> {
923        match self {
924            Operation::Read { device_block_offset, block_count, .. }
925            | Operation::Write { device_block_offset, block_count, .. }
926            | Operation::Trim { device_block_offset, block_count, .. } => {
927                Some((device_block_offset, block_count))
928            }
929            _ => None,
930        }
931    }
932
933    /// Maps the operation using `mapping` and returns the remainder.  `mapping` *must* overlap the
934    /// start of the operation.
935    fn map(
936        &mut self,
937        mapping: Option<&BlockOffsetMapping>,
938        max_blocks: Option<NonZero<u32>>,
939        block_size: u32,
940    ) -> Option<Self> {
941        let mut max = match self {
942            Operation::Read { .. } | Operation::Write { .. } => max_blocks.map(|m| m.get() as u64),
943            _ => None,
944        };
945        let (offset, length) = self.blocks_mut()?;
946        let orig_offset = *offset;
947        if let Some(mapping) = mapping {
948            let delta = *offset - mapping.source_block_offset;
949            debug_assert!(*offset - mapping.source_block_offset < mapping.length);
950            *offset = mapping.target_block_offset + delta;
951            let mapping_max = mapping.target_block_offset + mapping.length - *offset;
952            max = match max {
953                None => Some(mapping_max),
954                Some(m) => Some(std::cmp::min(m, mapping_max)),
955            };
956        };
957        if let Some(max) = max {
958            if *length as u64 > max {
959                let rem = (*length as u64 - max) as u32;
960                *length = max as u32;
961                return Some(match self {
962                    Operation::Read {
963                        device_block_offset: _,
964                        block_count: _,
965                        vmo_offset,
966                        _unused,
967                    } => Operation::Read {
968                        device_block_offset: orig_offset + max,
969                        block_count: rem,
970                        vmo_offset: *vmo_offset + max * block_size as u64,
971                        _unused: *_unused,
972                    },
973                    Operation::Write {
974                        device_block_offset: _,
975                        block_count: _,
976                        _unused,
977                        vmo_offset,
978                        options,
979                    } => {
980                        // Only send the barrier flag once per write request.
981                        let new_options = WriteOptions {
982                            flags: options.flags & !WriteFlags::PRE_BARRIER,
983                            ..*options
984                        };
985
986                        Operation::Write {
987                            device_block_offset: orig_offset + max,
988                            block_count: rem,
989                            _unused: *_unused,
990                            vmo_offset: *vmo_offset + max * block_size as u64,
991                            options: new_options,
992                        }
993                    }
994                    Operation::Trim { device_block_offset: _, block_count: _ } => {
995                        Operation::Trim { device_block_offset: orig_offset + max, block_count: rem }
996                    }
997                    _ => unreachable!(),
998                });
999            }
1000        }
1001        None
1002    }
1003}
1004
1005#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd)]
1006pub enum GroupOrRequest {
1007    Group(u16),
1008    Request(u32),
1009}
1010
1011impl GroupOrRequest {
1012    fn is_group(&self) -> bool {
1013        matches!(self, Self::Group(_))
1014    }
1015
1016    fn group_id(&self) -> Option<u16> {
1017        match self {
1018            Self::Group(id) => Some(*id),
1019            Self::Request(_) => None,
1020        }
1021    }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026    use super::{
1027        BlockOffsetMapping, BlockServer, DeviceInfo, FIFO_MAX_REQUESTS, Operation, PartitionInfo,
1028        TraceFlowId,
1029    };
1030    use assert_matches::assert_matches;
1031    use block_protocol::{
1032        BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteFlags, WriteOptions,
1033    };
1034    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
1035    use fuchsia_sync::Mutex;
1036    use futures::FutureExt as _;
1037    use futures::channel::oneshot;
1038    use futures::future::BoxFuture;
1039    use std::borrow::Cow;
1040    use std::future::poll_fn;
1041    use std::num::NonZero;
1042    use std::pin::pin;
1043    use std::sync::Arc;
1044    use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
1045    use std::task::{Context, Poll};
1046    use zx::{AsHandleRef as _, HandleBased as _};
1047    use {
1048        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
1049        fuchsia_async as fasync,
1050    };
1051
1052    #[derive(Default)]
1053    struct MockInterface {
1054        read_hook: Option<
1055            Box<
1056                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
1057                    + Send
1058                    + Sync,
1059            >,
1060        >,
1061        write_hook:
1062            Option<Box<dyn Fn(u64) -> BoxFuture<'static, Result<(), zx::Status>> + Send + Sync>>,
1063        barrier_hook: Option<Box<dyn Fn() -> Result<(), zx::Status> + Send + Sync>>,
1064    }
1065
1066    impl super::async_interface::Interface for MockInterface {
1067        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1068            Ok(())
1069        }
1070
1071        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1072            Ok(Cow::Owned(test_device_info()))
1073        }
1074
1075        async fn read(
1076            &self,
1077            device_block_offset: u64,
1078            block_count: u32,
1079            vmo: &Arc<zx::Vmo>,
1080            vmo_offset: u64,
1081            _trace_flow_id: TraceFlowId,
1082        ) -> Result<(), zx::Status> {
1083            if let Some(read_hook) = &self.read_hook {
1084                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
1085            } else {
1086                unimplemented!();
1087            }
1088        }
1089
1090        async fn write(
1091            &self,
1092            device_block_offset: u64,
1093            _block_count: u32,
1094            _vmo: &Arc<zx::Vmo>,
1095            _vmo_offset: u64,
1096            _opts: WriteOptions,
1097            _trace_flow_id: TraceFlowId,
1098        ) -> Result<(), zx::Status> {
1099            if let Some(write_hook) = &self.write_hook {
1100                write_hook(device_block_offset).await
1101            } else {
1102                unimplemented!();
1103            }
1104        }
1105
1106        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1107            unreachable!();
1108        }
1109
1110        fn barrier(&self) -> Result<(), zx::Status> {
1111            if let Some(barrier_hook) = &self.barrier_hook {
1112                barrier_hook()
1113            } else {
1114                unimplemented!();
1115            }
1116        }
1117
1118        async fn trim(
1119            &self,
1120            _device_block_offset: u64,
1121            _block_count: u32,
1122            _trace_flow_id: TraceFlowId,
1123        ) -> Result<(), zx::Status> {
1124            unreachable!();
1125        }
1126
1127        async fn get_volume_info(
1128            &self,
1129        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
1130            // Hang forever for the test_requests_dont_block_sessions test.
1131            let () = std::future::pending().await;
1132            unreachable!();
1133        }
1134    }
1135
1136    const BLOCK_SIZE: u32 = 512;
1137    const MAX_TRANSFER_BLOCKS: u32 = 10;
1138
1139    fn test_device_info() -> DeviceInfo {
1140        DeviceInfo::Partition(PartitionInfo {
1141            device_flags: fblock::Flag::READONLY,
1142            max_transfer_blocks: NonZero::new(MAX_TRANSFER_BLOCKS),
1143            block_range: Some(0..100),
1144            type_guid: [1; 16],
1145            instance_guid: [2; 16],
1146            name: "foo".to_string(),
1147            flags: 0xabcd,
1148        })
1149    }
1150
1151    #[fuchsia::test]
1152    async fn test_split_request_only_issues_single_barrier() {
1153        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1154        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1155        let barrier_called = Arc::new(AtomicU32::new(0));
1156        let barrier_called_clone = barrier_called.clone();
1157
1158        futures::join!(
1159            async move {
1160                let block_server = BlockServer::new(
1161                    BLOCK_SIZE,
1162                    Arc::new(MockInterface {
1163                        barrier_hook: Some(Box::new(move || {
1164                            barrier_called_clone.fetch_add(1, Ordering::Relaxed);
1165                            Ok(())
1166                        })),
1167                        write_hook: Some(Box::new(move |_device_block_offset| {
1168                            Box::pin(async move { Ok(()) })
1169                        })),
1170                        ..MockInterface::default()
1171                    }),
1172                );
1173                block_server.handle_requests(stream).await.unwrap();
1174            },
1175            async move {
1176                let (session_proxy, server) = fidl::endpoints::create_proxy();
1177
1178                proxy.open_session(server).unwrap();
1179
1180                let vmo_id = session_proxy
1181                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1182                    .await
1183                    .unwrap()
1184                    .unwrap();
1185                assert_ne!(vmo_id.id, 0);
1186
1187                let mut fifo =
1188                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1189                let (mut reader, mut writer) = fifo.async_io();
1190                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1191                // sub requests.
1192                writer
1193                    .write_entries(&BlockFifoRequest {
1194                        command: BlockFifoCommand {
1195                            opcode: BlockOpcode::Write.into_primitive(),
1196                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1197                            ..Default::default()
1198                        },
1199                        vmoid: vmo_id.id,
1200                        dev_offset: 0,
1201                        length: MAX_TRANSFER_BLOCKS + 1,
1202                        vmo_offset: 6,
1203                        ..Default::default()
1204                    })
1205                    .await
1206                    .unwrap();
1207
1208                let mut response = BlockFifoResponse::default();
1209                reader.read_entries(&mut response).await.unwrap();
1210                assert_eq!(response.status, zx::sys::ZX_OK);
1211
1212                assert_eq!(barrier_called.load(Ordering::Relaxed), 1);
1213
1214                std::mem::drop(proxy);
1215            }
1216        );
1217    }
1218
1219    #[fuchsia::test]
1220    async fn test_barriers_not_supported() {
1221        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1222        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1223
1224        futures::join!(
1225            async move {
1226                let block_server = BlockServer::new(
1227                    BLOCK_SIZE,
1228                    Arc::new(MockInterface {
1229                        barrier_hook: Some(Box::new(move || Err(zx::Status::NOT_SUPPORTED))),
1230                        write_hook: Some(Box::new(move |_device_block_offset| {
1231                            Box::pin(async move { Ok(()) })
1232                        })),
1233                        ..MockInterface::default()
1234                    }),
1235                );
1236                block_server.handle_requests(stream).await.unwrap();
1237            },
1238            async move {
1239                let (session_proxy, server) = fidl::endpoints::create_proxy();
1240
1241                proxy.open_session(server).unwrap();
1242
1243                let vmo_id = session_proxy
1244                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1245                    .await
1246                    .unwrap()
1247                    .unwrap();
1248                assert_ne!(vmo_id.id, 0);
1249
1250                let mut fifo =
1251                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1252                let (mut reader, mut writer) = fifo.async_io();
1253
1254                // Set length to MAX_TRANSFER_BLOCKS + 1 to ensure the write is split across two
1255                // sub requests.
1256                writer
1257                    .write_entries(&BlockFifoRequest {
1258                        command: BlockFifoCommand {
1259                            opcode: BlockOpcode::Write.into_primitive(),
1260                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1261                            ..Default::default()
1262                        },
1263                        vmoid: vmo_id.id,
1264                        dev_offset: 0,
1265                        length: MAX_TRANSFER_BLOCKS + 1,
1266                        vmo_offset: 6,
1267                        ..Default::default()
1268                    })
1269                    .await
1270                    .unwrap();
1271
1272                let mut response = BlockFifoResponse::default();
1273                reader.read_entries(&mut response).await.unwrap();
1274                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1275
1276                std::mem::drop(proxy);
1277            }
1278        );
1279    }
1280
1281    #[fuchsia::test]
1282    async fn test_barriers_ordering() {
1283        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1284        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1285        let barrier_called = Arc::new(AtomicBool::new(false));
1286
1287        futures::join!(
1288            async move {
1289                let barrier_called_clone = barrier_called.clone();
1290                let block_server = BlockServer::new(
1291                    BLOCK_SIZE,
1292                    Arc::new(MockInterface {
1293                        barrier_hook: Some(Box::new(move || {
1294                            barrier_called.store(true, Ordering::Relaxed);
1295                            Ok(())
1296                        })),
1297                        write_hook: Some(Box::new(move |device_block_offset| {
1298                            let barrier_called = barrier_called_clone.clone();
1299                            Box::pin(async move {
1300                                // The sleep allows the server to reorder the fifo requests.
1301                                if device_block_offset % 2 == 0 {
1302                                    fasync::Timer::new(fasync::MonotonicInstant::after(
1303                                        zx::MonotonicDuration::from_millis(200),
1304                                    ))
1305                                    .await;
1306                                }
1307                                assert!(barrier_called.load(Ordering::Relaxed));
1308                                Ok(())
1309                            })
1310                        })),
1311                        ..MockInterface::default()
1312                    }),
1313                );
1314                block_server.handle_requests(stream).await.unwrap();
1315            },
1316            async move {
1317                let (session_proxy, server) = fidl::endpoints::create_proxy();
1318
1319                proxy.open_session(server).unwrap();
1320
1321                let vmo_id = session_proxy
1322                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1323                    .await
1324                    .unwrap()
1325                    .unwrap();
1326                assert_ne!(vmo_id.id, 0);
1327
1328                let mut fifo =
1329                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1330                let (mut reader, mut writer) = fifo.async_io();
1331
1332                writer
1333                    .write_entries(&BlockFifoRequest {
1334                        command: BlockFifoCommand {
1335                            opcode: BlockOpcode::Write.into_primitive(),
1336                            flags: BlockIoFlag::PRE_BARRIER.bits(),
1337                            ..Default::default()
1338                        },
1339                        vmoid: vmo_id.id,
1340                        dev_offset: 0,
1341                        length: 5,
1342                        vmo_offset: 6,
1343                        ..Default::default()
1344                    })
1345                    .await
1346                    .unwrap();
1347
1348                for i in 0..10 {
1349                    writer
1350                        .write_entries(&BlockFifoRequest {
1351                            command: BlockFifoCommand {
1352                                opcode: BlockOpcode::Write.into_primitive(),
1353                                ..Default::default()
1354                            },
1355                            vmoid: vmo_id.id,
1356                            dev_offset: i + 1,
1357                            length: 5,
1358                            vmo_offset: 6,
1359                            ..Default::default()
1360                        })
1361                        .await
1362                        .unwrap();
1363                }
1364                for _ in 0..11 {
1365                    let mut response = BlockFifoResponse::default();
1366                    reader.read_entries(&mut response).await.unwrap();
1367                    assert_eq!(response.status, zx::sys::ZX_OK);
1368                }
1369
1370                std::mem::drop(proxy);
1371            }
1372        );
1373    }
1374
1375    #[fuchsia::test]
1376    async fn test_info() {
1377        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1378
1379        futures::join!(
1380            async {
1381                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1382                block_server.handle_requests(stream).await.unwrap();
1383            },
1384            async {
1385                let expected_info = test_device_info();
1386                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1387                    info
1388                } else {
1389                    unreachable!()
1390                };
1391
1392                let block_info = proxy.get_info().await.unwrap().unwrap();
1393                assert_eq!(
1394                    block_info.block_count,
1395                    partition_info.block_range.as_ref().unwrap().end
1396                        - partition_info.block_range.as_ref().unwrap().start
1397                );
1398                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1399
1400                assert_eq!(block_info.max_transfer_size, MAX_TRANSFER_BLOCKS * BLOCK_SIZE);
1401
1402                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1403                assert_eq!(status, zx::sys::ZX_OK);
1404                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1405
1406                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1407                assert_eq!(status, zx::sys::ZX_OK);
1408                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1409
1410                let (status, name) = proxy.get_name().await.unwrap();
1411                assert_eq!(status, zx::sys::ZX_OK);
1412                assert_eq!(name.as_ref(), Some(&partition_info.name));
1413
1414                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1415                assert_eq!(metadata.name, name);
1416                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1417                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1418                assert_eq!(
1419                    metadata.start_block_offset,
1420                    Some(partition_info.block_range.as_ref().unwrap().start)
1421                );
1422                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1423                assert_eq!(metadata.flags, Some(partition_info.flags));
1424
1425                std::mem::drop(proxy);
1426            }
1427        );
1428    }
1429
1430    #[fuchsia::test]
1431    async fn test_attach_vmo() {
1432        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1433
1434        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1435        let koid = vmo.get_koid().unwrap();
1436
1437        futures::join!(
1438            async {
1439                let block_server = BlockServer::new(
1440                    BLOCK_SIZE,
1441                    Arc::new(MockInterface {
1442                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1443                            assert_eq!(vmo.get_koid().unwrap(), koid);
1444                            Box::pin(async { Ok(()) })
1445                        })),
1446                        ..MockInterface::default()
1447                    }),
1448                );
1449                block_server.handle_requests(stream).await.unwrap();
1450            },
1451            async move {
1452                let (session_proxy, server) = fidl::endpoints::create_proxy();
1453
1454                proxy.open_session(server).unwrap();
1455
1456                let vmo_id = session_proxy
1457                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1458                    .await
1459                    .unwrap()
1460                    .unwrap();
1461                assert_ne!(vmo_id.id, 0);
1462
1463                let mut fifo =
1464                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1465                let (mut reader, mut writer) = fifo.async_io();
1466
1467                // Keep attaching VMOs until we eventually hit the maximum.
1468                let mut count = 1;
1469                loop {
1470                    match session_proxy
1471                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1472                        .await
1473                        .unwrap()
1474                    {
1475                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1476                        Err(e) => {
1477                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1478                            break;
1479                        }
1480                    }
1481
1482                    // Only test every 10 to keep test time down.
1483                    if count % 10 == 0 {
1484                        writer
1485                            .write_entries(&BlockFifoRequest {
1486                                command: BlockFifoCommand {
1487                                    opcode: BlockOpcode::Read.into_primitive(),
1488                                    ..Default::default()
1489                                },
1490                                vmoid: vmo_id.id,
1491                                length: 1,
1492                                ..Default::default()
1493                            })
1494                            .await
1495                            .unwrap();
1496
1497                        let mut response = BlockFifoResponse::default();
1498                        reader.read_entries(&mut response).await.unwrap();
1499                        assert_eq!(response.status, zx::sys::ZX_OK);
1500                    }
1501
1502                    count += 1;
1503                }
1504
1505                assert_eq!(count, u16::MAX as u64);
1506
1507                // Detach the original VMO, and make sure we can then attach another one.
1508                writer
1509                    .write_entries(&BlockFifoRequest {
1510                        command: BlockFifoCommand {
1511                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1512                            ..Default::default()
1513                        },
1514                        vmoid: vmo_id.id,
1515                        ..Default::default()
1516                    })
1517                    .await
1518                    .unwrap();
1519
1520                let mut response = BlockFifoResponse::default();
1521                reader.read_entries(&mut response).await.unwrap();
1522                assert_eq!(response.status, zx::sys::ZX_OK);
1523
1524                let new_vmo_id = session_proxy
1525                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1526                    .await
1527                    .unwrap()
1528                    .unwrap();
1529                // It should reuse the same ID.
1530                assert_eq!(new_vmo_id.id, vmo_id.id);
1531
1532                std::mem::drop(proxy);
1533            }
1534        );
1535    }
1536
1537    #[fuchsia::test]
1538    async fn test_close() {
1539        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1540
1541        let mut server = std::pin::pin!(
1542            async {
1543                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1544                block_server.handle_requests(stream).await.unwrap();
1545            }
1546            .fuse()
1547        );
1548
1549        let mut client = std::pin::pin!(
1550            async {
1551                let (session_proxy, server) = fidl::endpoints::create_proxy();
1552
1553                proxy.open_session(server).unwrap();
1554
1555                // Dropping the proxy should not cause the session to terminate because the session is
1556                // still live.
1557                std::mem::drop(proxy);
1558
1559                session_proxy.close().await.unwrap().unwrap();
1560
1561                // Keep the session alive.  Calling `close` should cause the server to terminate.
1562                let _: () = std::future::pending().await;
1563            }
1564            .fuse()
1565        );
1566
1567        futures::select!(
1568            _ = server => {}
1569            _ = client => unreachable!(),
1570        );
1571    }
1572
1573    #[derive(Default)]
1574    struct IoMockInterface {
1575        do_checks: bool,
1576        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1577        return_errors: bool,
1578    }
1579
1580    #[derive(Debug)]
1581    enum ExpectedOp {
1582        Read(u64, u32, u64),
1583        Write(u64, u32, u64),
1584        Trim(u64, u32),
1585        Flush,
1586    }
1587
1588    impl super::async_interface::Interface for IoMockInterface {
1589        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1590            Ok(())
1591        }
1592
1593        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1594            Ok(Cow::Owned(test_device_info()))
1595        }
1596
1597        async fn read(
1598            &self,
1599            device_block_offset: u64,
1600            block_count: u32,
1601            _vmo: &Arc<zx::Vmo>,
1602            vmo_offset: u64,
1603            _trace_flow_id: TraceFlowId,
1604        ) -> Result<(), zx::Status> {
1605            if self.return_errors {
1606                Err(zx::Status::INTERNAL)
1607            } else {
1608                if self.do_checks {
1609                    assert_matches!(
1610                        self.expected_op.lock().take(),
1611                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1612                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1613                        "Read {device_block_offset} {block_count} {vmo_offset}"
1614                    );
1615                }
1616                Ok(())
1617            }
1618        }
1619
1620        async fn write(
1621            &self,
1622            device_block_offset: u64,
1623            block_count: u32,
1624            _vmo: &Arc<zx::Vmo>,
1625            vmo_offset: u64,
1626            _opts: WriteOptions,
1627            _trace_flow_id: TraceFlowId,
1628        ) -> Result<(), zx::Status> {
1629            if self.return_errors {
1630                Err(zx::Status::NOT_SUPPORTED)
1631            } else {
1632                if self.do_checks {
1633                    assert_matches!(
1634                        self.expected_op.lock().take(),
1635                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1636                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1637                        "Write {device_block_offset} {block_count} {vmo_offset}"
1638                    );
1639                }
1640                Ok(())
1641            }
1642        }
1643
1644        async fn flush(&self, _trace_flow_id: TraceFlowId) -> Result<(), zx::Status> {
1645            if self.return_errors {
1646                Err(zx::Status::NO_RESOURCES)
1647            } else {
1648                if self.do_checks {
1649                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1650                }
1651                Ok(())
1652            }
1653        }
1654
1655        fn barrier(&self) -> Result<(), zx::Status> {
1656            unreachable!()
1657        }
1658
1659        async fn trim(
1660            &self,
1661            device_block_offset: u64,
1662            block_count: u32,
1663            _trace_flow_id: TraceFlowId,
1664        ) -> Result<(), zx::Status> {
1665            if self.return_errors {
1666                Err(zx::Status::NO_MEMORY)
1667            } else {
1668                if self.do_checks {
1669                    assert_matches!(
1670                        self.expected_op.lock().take(),
1671                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1672                            block_count == b,
1673                        "Trim {device_block_offset} {block_count}"
1674                    );
1675                }
1676                Ok(())
1677            }
1678        }
1679    }
1680
1681    #[fuchsia::test]
1682    async fn test_io() {
1683        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1684
1685        let expected_op = Arc::new(Mutex::new(None));
1686        let expected_op_clone = expected_op.clone();
1687
1688        let server = async {
1689            let block_server = BlockServer::new(
1690                BLOCK_SIZE,
1691                Arc::new(IoMockInterface {
1692                    return_errors: false,
1693                    do_checks: true,
1694                    expected_op: expected_op_clone,
1695                }),
1696            );
1697            block_server.handle_requests(stream).await.unwrap();
1698        };
1699
1700        let client = async move {
1701            let (session_proxy, server) = fidl::endpoints::create_proxy();
1702
1703            proxy.open_session(server).unwrap();
1704
1705            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1706            let vmo_id = session_proxy
1707                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1708                .await
1709                .unwrap()
1710                .unwrap();
1711
1712            let mut fifo =
1713                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1714            let (mut reader, mut writer) = fifo.async_io();
1715
1716            // READ
1717            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1718            writer
1719                .write_entries(&BlockFifoRequest {
1720                    command: BlockFifoCommand {
1721                        opcode: BlockOpcode::Read.into_primitive(),
1722                        ..Default::default()
1723                    },
1724                    vmoid: vmo_id.id,
1725                    dev_offset: 1,
1726                    length: 2,
1727                    vmo_offset: 3,
1728                    ..Default::default()
1729                })
1730                .await
1731                .unwrap();
1732
1733            let mut response = BlockFifoResponse::default();
1734            reader.read_entries(&mut response).await.unwrap();
1735            assert_eq!(response.status, zx::sys::ZX_OK);
1736
1737            // WRITE
1738            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1739            writer
1740                .write_entries(&BlockFifoRequest {
1741                    command: BlockFifoCommand {
1742                        opcode: BlockOpcode::Write.into_primitive(),
1743                        ..Default::default()
1744                    },
1745                    vmoid: vmo_id.id,
1746                    dev_offset: 4,
1747                    length: 5,
1748                    vmo_offset: 6,
1749                    ..Default::default()
1750                })
1751                .await
1752                .unwrap();
1753
1754            let mut response = BlockFifoResponse::default();
1755            reader.read_entries(&mut response).await.unwrap();
1756            assert_eq!(response.status, zx::sys::ZX_OK);
1757
1758            // FLUSH
1759            *expected_op.lock() = Some(ExpectedOp::Flush);
1760            writer
1761                .write_entries(&BlockFifoRequest {
1762                    command: BlockFifoCommand {
1763                        opcode: BlockOpcode::Flush.into_primitive(),
1764                        ..Default::default()
1765                    },
1766                    ..Default::default()
1767                })
1768                .await
1769                .unwrap();
1770
1771            reader.read_entries(&mut response).await.unwrap();
1772            assert_eq!(response.status, zx::sys::ZX_OK);
1773
1774            // TRIM
1775            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1776            writer
1777                .write_entries(&BlockFifoRequest {
1778                    command: BlockFifoCommand {
1779                        opcode: BlockOpcode::Trim.into_primitive(),
1780                        ..Default::default()
1781                    },
1782                    dev_offset: 7,
1783                    length: 8,
1784                    ..Default::default()
1785                })
1786                .await
1787                .unwrap();
1788
1789            reader.read_entries(&mut response).await.unwrap();
1790            assert_eq!(response.status, zx::sys::ZX_OK);
1791
1792            std::mem::drop(proxy);
1793        };
1794
1795        futures::join!(server, client);
1796    }
1797
1798    #[fuchsia::test]
1799    async fn test_io_errors() {
1800        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1801
1802        futures::join!(
1803            async {
1804                let block_server = BlockServer::new(
1805                    BLOCK_SIZE,
1806                    Arc::new(IoMockInterface {
1807                        return_errors: true,
1808                        do_checks: false,
1809                        expected_op: Arc::new(Mutex::new(None)),
1810                    }),
1811                );
1812                block_server.handle_requests(stream).await.unwrap();
1813            },
1814            async move {
1815                let (session_proxy, server) = fidl::endpoints::create_proxy();
1816
1817                proxy.open_session(server).unwrap();
1818
1819                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1820                let vmo_id = session_proxy
1821                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1822                    .await
1823                    .unwrap()
1824                    .unwrap();
1825
1826                let mut fifo =
1827                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1828                let (mut reader, mut writer) = fifo.async_io();
1829
1830                // READ
1831                writer
1832                    .write_entries(&BlockFifoRequest {
1833                        command: BlockFifoCommand {
1834                            opcode: BlockOpcode::Read.into_primitive(),
1835                            ..Default::default()
1836                        },
1837                        vmoid: vmo_id.id,
1838                        length: 1,
1839                        reqid: 1,
1840                        ..Default::default()
1841                    })
1842                    .await
1843                    .unwrap();
1844
1845                let mut response = BlockFifoResponse::default();
1846                reader.read_entries(&mut response).await.unwrap();
1847                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1848
1849                // WRITE
1850                writer
1851                    .write_entries(&BlockFifoRequest {
1852                        command: BlockFifoCommand {
1853                            opcode: BlockOpcode::Write.into_primitive(),
1854                            ..Default::default()
1855                        },
1856                        vmoid: vmo_id.id,
1857                        length: 1,
1858                        reqid: 2,
1859                        ..Default::default()
1860                    })
1861                    .await
1862                    .unwrap();
1863
1864                reader.read_entries(&mut response).await.unwrap();
1865                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1866
1867                // FLUSH
1868                writer
1869                    .write_entries(&BlockFifoRequest {
1870                        command: BlockFifoCommand {
1871                            opcode: BlockOpcode::Flush.into_primitive(),
1872                            ..Default::default()
1873                        },
1874                        reqid: 3,
1875                        ..Default::default()
1876                    })
1877                    .await
1878                    .unwrap();
1879
1880                reader.read_entries(&mut response).await.unwrap();
1881                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1882
1883                // TRIM
1884                writer
1885                    .write_entries(&BlockFifoRequest {
1886                        command: BlockFifoCommand {
1887                            opcode: BlockOpcode::Trim.into_primitive(),
1888                            ..Default::default()
1889                        },
1890                        reqid: 4,
1891                        length: 1,
1892                        ..Default::default()
1893                    })
1894                    .await
1895                    .unwrap();
1896
1897                reader.read_entries(&mut response).await.unwrap();
1898                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1899
1900                std::mem::drop(proxy);
1901            }
1902        );
1903    }
1904
1905    #[fuchsia::test]
1906    async fn test_invalid_args() {
1907        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1908
1909        futures::join!(
1910            async {
1911                let block_server = BlockServer::new(
1912                    BLOCK_SIZE,
1913                    Arc::new(IoMockInterface {
1914                        return_errors: false,
1915                        do_checks: false,
1916                        expected_op: Arc::new(Mutex::new(None)),
1917                    }),
1918                );
1919                block_server.handle_requests(stream).await.unwrap();
1920            },
1921            async move {
1922                let (session_proxy, server) = fidl::endpoints::create_proxy();
1923
1924                proxy.open_session(server).unwrap();
1925
1926                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1927                let vmo_id = session_proxy
1928                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1929                    .await
1930                    .unwrap()
1931                    .unwrap();
1932
1933                let mut fifo =
1934                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1935
1936                async fn test(
1937                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1938                    request: BlockFifoRequest,
1939                ) -> Result<(), zx::Status> {
1940                    let (mut reader, mut writer) = fifo.async_io();
1941                    writer.write_entries(&request).await.unwrap();
1942                    let mut response = BlockFifoResponse::default();
1943                    reader.read_entries(&mut response).await.unwrap();
1944                    zx::Status::ok(response.status)
1945                }
1946
1947                // READ
1948
1949                let good_read_request = || BlockFifoRequest {
1950                    command: BlockFifoCommand {
1951                        opcode: BlockOpcode::Read.into_primitive(),
1952                        ..Default::default()
1953                    },
1954                    length: 1,
1955                    vmoid: vmo_id.id,
1956                    ..Default::default()
1957                };
1958
1959                assert_eq!(
1960                    test(
1961                        &mut fifo,
1962                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1963                    )
1964                    .await,
1965                    Err(zx::Status::IO)
1966                );
1967
1968                assert_eq!(
1969                    test(
1970                        &mut fifo,
1971                        BlockFifoRequest {
1972                            vmo_offset: 0xffff_ffff_ffff_ffff,
1973                            ..good_read_request()
1974                        }
1975                    )
1976                    .await,
1977                    Err(zx::Status::OUT_OF_RANGE)
1978                );
1979
1980                assert_eq!(
1981                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_read_request() }).await,
1982                    Err(zx::Status::INVALID_ARGS)
1983                );
1984
1985                // WRITE
1986
1987                let good_write_request = || BlockFifoRequest {
1988                    command: BlockFifoCommand {
1989                        opcode: BlockOpcode::Write.into_primitive(),
1990                        ..Default::default()
1991                    },
1992                    length: 1,
1993                    vmoid: vmo_id.id,
1994                    ..Default::default()
1995                };
1996
1997                assert_eq!(
1998                    test(
1999                        &mut fifo,
2000                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
2001                    )
2002                    .await,
2003                    Err(zx::Status::IO)
2004                );
2005
2006                assert_eq!(
2007                    test(
2008                        &mut fifo,
2009                        BlockFifoRequest {
2010                            vmo_offset: 0xffff_ffff_ffff_ffff,
2011                            ..good_write_request()
2012                        }
2013                    )
2014                    .await,
2015                    Err(zx::Status::OUT_OF_RANGE)
2016                );
2017
2018                assert_eq!(
2019                    test(&mut fifo, BlockFifoRequest { length: 0, ..good_write_request() }).await,
2020                    Err(zx::Status::INVALID_ARGS)
2021                );
2022
2023                // CLOSE VMO
2024
2025                assert_eq!(
2026                    test(
2027                        &mut fifo,
2028                        BlockFifoRequest {
2029                            command: BlockFifoCommand {
2030                                opcode: BlockOpcode::CloseVmo.into_primitive(),
2031                                ..Default::default()
2032                            },
2033                            vmoid: vmo_id.id + 1,
2034                            ..Default::default()
2035                        }
2036                    )
2037                    .await,
2038                    Err(zx::Status::IO)
2039                );
2040
2041                std::mem::drop(proxy);
2042            }
2043        );
2044    }
2045
2046    #[fuchsia::test]
2047    async fn test_concurrent_requests() {
2048        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2049
2050        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
2051        let waiting_readers_clone = waiting_readers.clone();
2052
2053        futures::join!(
2054            async move {
2055                let block_server = BlockServer::new(
2056                    BLOCK_SIZE,
2057                    Arc::new(MockInterface {
2058                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
2059                            let (tx, rx) = oneshot::channel();
2060                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
2061                            Box::pin(async move {
2062                                let _ = rx.await;
2063                                Ok(())
2064                            })
2065                        })),
2066                        ..MockInterface::default()
2067                    }),
2068                );
2069                block_server.handle_requests(stream).await.unwrap();
2070            },
2071            async move {
2072                let (session_proxy, server) = fidl::endpoints::create_proxy();
2073
2074                proxy.open_session(server).unwrap();
2075
2076                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2077                let vmo_id = session_proxy
2078                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2079                    .await
2080                    .unwrap()
2081                    .unwrap();
2082
2083                let mut fifo =
2084                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2085                let (mut reader, mut writer) = fifo.async_io();
2086
2087                writer
2088                    .write_entries(&BlockFifoRequest {
2089                        command: BlockFifoCommand {
2090                            opcode: BlockOpcode::Read.into_primitive(),
2091                            ..Default::default()
2092                        },
2093                        reqid: 1,
2094                        dev_offset: 1, // Intentionally use the same as `reqid`.
2095                        vmoid: vmo_id.id,
2096                        length: 1,
2097                        ..Default::default()
2098                    })
2099                    .await
2100                    .unwrap();
2101
2102                writer
2103                    .write_entries(&BlockFifoRequest {
2104                        command: BlockFifoCommand {
2105                            opcode: BlockOpcode::Read.into_primitive(),
2106                            ..Default::default()
2107                        },
2108                        reqid: 2,
2109                        dev_offset: 2,
2110                        vmoid: vmo_id.id,
2111                        length: 1,
2112                        ..Default::default()
2113                    })
2114                    .await
2115                    .unwrap();
2116
2117                // Wait till both those entries are pending.
2118                poll_fn(|cx: &mut Context<'_>| {
2119                    if waiting_readers.lock().len() == 2 {
2120                        Poll::Ready(())
2121                    } else {
2122                        // Yield to the executor.
2123                        cx.waker().wake_by_ref();
2124                        Poll::Pending
2125                    }
2126                })
2127                .await;
2128
2129                let mut response = BlockFifoResponse::default();
2130                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2131
2132                let (id, tx) = waiting_readers.lock().pop().unwrap();
2133                tx.send(()).unwrap();
2134
2135                reader.read_entries(&mut response).await.unwrap();
2136                assert_eq!(response.status, zx::sys::ZX_OK);
2137                assert_eq!(response.reqid, id);
2138
2139                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2140
2141                let (id, tx) = waiting_readers.lock().pop().unwrap();
2142                tx.send(()).unwrap();
2143
2144                reader.read_entries(&mut response).await.unwrap();
2145                assert_eq!(response.status, zx::sys::ZX_OK);
2146                assert_eq!(response.reqid, id);
2147            }
2148        );
2149    }
2150
2151    #[fuchsia::test]
2152    async fn test_groups() {
2153        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2154
2155        futures::join!(
2156            async move {
2157                let block_server = BlockServer::new(
2158                    BLOCK_SIZE,
2159                    Arc::new(MockInterface {
2160                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
2161                        ..MockInterface::default()
2162                    }),
2163                );
2164                block_server.handle_requests(stream).await.unwrap();
2165            },
2166            async move {
2167                let (session_proxy, server) = fidl::endpoints::create_proxy();
2168
2169                proxy.open_session(server).unwrap();
2170
2171                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2172                let vmo_id = session_proxy
2173                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2174                    .await
2175                    .unwrap()
2176                    .unwrap();
2177
2178                let mut fifo =
2179                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2180                let (mut reader, mut writer) = fifo.async_io();
2181
2182                writer
2183                    .write_entries(&BlockFifoRequest {
2184                        command: BlockFifoCommand {
2185                            opcode: BlockOpcode::Read.into_primitive(),
2186                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2187                            ..Default::default()
2188                        },
2189                        group: 1,
2190                        vmoid: vmo_id.id,
2191                        length: 1,
2192                        ..Default::default()
2193                    })
2194                    .await
2195                    .unwrap();
2196
2197                writer
2198                    .write_entries(&BlockFifoRequest {
2199                        command: BlockFifoCommand {
2200                            opcode: BlockOpcode::Read.into_primitive(),
2201                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2202                            ..Default::default()
2203                        },
2204                        reqid: 2,
2205                        group: 1,
2206                        vmoid: vmo_id.id,
2207                        length: 1,
2208                        ..Default::default()
2209                    })
2210                    .await
2211                    .unwrap();
2212
2213                let mut response = BlockFifoResponse::default();
2214                reader.read_entries(&mut response).await.unwrap();
2215                assert_eq!(response.status, zx::sys::ZX_OK);
2216                assert_eq!(response.reqid, 2);
2217                assert_eq!(response.group, 1);
2218            }
2219        );
2220    }
2221
2222    #[fuchsia::test]
2223    async fn test_group_error() {
2224        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2225
2226        let counter = Arc::new(AtomicU64::new(0));
2227        let counter_clone = counter.clone();
2228
2229        futures::join!(
2230            async move {
2231                let block_server = BlockServer::new(
2232                    BLOCK_SIZE,
2233                    Arc::new(MockInterface {
2234                        read_hook: Some(Box::new(move |_, _, _, _| {
2235                            counter_clone.fetch_add(1, Ordering::Relaxed);
2236                            Box::pin(async { Err(zx::Status::BAD_STATE) })
2237                        })),
2238                        ..MockInterface::default()
2239                    }),
2240                );
2241                block_server.handle_requests(stream).await.unwrap();
2242            },
2243            async move {
2244                let (session_proxy, server) = fidl::endpoints::create_proxy();
2245
2246                proxy.open_session(server).unwrap();
2247
2248                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2249                let vmo_id = session_proxy
2250                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2251                    .await
2252                    .unwrap()
2253                    .unwrap();
2254
2255                let mut fifo =
2256                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2257                let (mut reader, mut writer) = fifo.async_io();
2258
2259                writer
2260                    .write_entries(&BlockFifoRequest {
2261                        command: BlockFifoCommand {
2262                            opcode: BlockOpcode::Read.into_primitive(),
2263                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2264                            ..Default::default()
2265                        },
2266                        group: 1,
2267                        vmoid: vmo_id.id,
2268                        length: 1,
2269                        ..Default::default()
2270                    })
2271                    .await
2272                    .unwrap();
2273
2274                // Wait until processed.
2275                poll_fn(|cx: &mut Context<'_>| {
2276                    if counter.load(Ordering::Relaxed) == 1 {
2277                        Poll::Ready(())
2278                    } else {
2279                        // Yield to the executor.
2280                        cx.waker().wake_by_ref();
2281                        Poll::Pending
2282                    }
2283                })
2284                .await;
2285
2286                let mut response = BlockFifoResponse::default();
2287                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2288
2289                writer
2290                    .write_entries(&BlockFifoRequest {
2291                        command: BlockFifoCommand {
2292                            opcode: BlockOpcode::Read.into_primitive(),
2293                            flags: BlockIoFlag::GROUP_ITEM.bits(),
2294                            ..Default::default()
2295                        },
2296                        group: 1,
2297                        vmoid: vmo_id.id,
2298                        length: 1,
2299                        ..Default::default()
2300                    })
2301                    .await
2302                    .unwrap();
2303
2304                writer
2305                    .write_entries(&BlockFifoRequest {
2306                        command: BlockFifoCommand {
2307                            opcode: BlockOpcode::Read.into_primitive(),
2308                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2309                            ..Default::default()
2310                        },
2311                        reqid: 2,
2312                        group: 1,
2313                        vmoid: vmo_id.id,
2314                        length: 1,
2315                        ..Default::default()
2316                    })
2317                    .await
2318                    .unwrap();
2319
2320                reader.read_entries(&mut response).await.unwrap();
2321                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
2322                assert_eq!(response.reqid, 2);
2323                assert_eq!(response.group, 1);
2324
2325                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
2326
2327                // Only the first request should have been processed.
2328                assert_eq!(counter.load(Ordering::Relaxed), 1);
2329            }
2330        );
2331    }
2332
2333    #[fuchsia::test]
2334    async fn test_group_with_two_lasts() {
2335        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2336
2337        let (tx, rx) = oneshot::channel();
2338
2339        futures::join!(
2340            async move {
2341                let rx = Mutex::new(Some(rx));
2342                let block_server = BlockServer::new(
2343                    BLOCK_SIZE,
2344                    Arc::new(MockInterface {
2345                        read_hook: Some(Box::new(move |_, _, _, _| {
2346                            let rx = rx.lock().take().unwrap();
2347                            Box::pin(async {
2348                                let _ = rx.await;
2349                                Ok(())
2350                            })
2351                        })),
2352                        ..MockInterface::default()
2353                    }),
2354                );
2355                block_server.handle_requests(stream).await.unwrap();
2356            },
2357            async move {
2358                let (session_proxy, server) = fidl::endpoints::create_proxy();
2359
2360                proxy.open_session(server).unwrap();
2361
2362                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2363                let vmo_id = session_proxy
2364                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2365                    .await
2366                    .unwrap()
2367                    .unwrap();
2368
2369                let mut fifo =
2370                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2371                let (mut reader, mut writer) = fifo.async_io();
2372
2373                writer
2374                    .write_entries(&BlockFifoRequest {
2375                        command: BlockFifoCommand {
2376                            opcode: BlockOpcode::Read.into_primitive(),
2377                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2378                            ..Default::default()
2379                        },
2380                        reqid: 1,
2381                        group: 1,
2382                        vmoid: vmo_id.id,
2383                        length: 1,
2384                        ..Default::default()
2385                    })
2386                    .await
2387                    .unwrap();
2388
2389                writer
2390                    .write_entries(&BlockFifoRequest {
2391                        command: BlockFifoCommand {
2392                            opcode: BlockOpcode::Read.into_primitive(),
2393                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2394                            ..Default::default()
2395                        },
2396                        reqid: 2,
2397                        group: 1,
2398                        vmoid: vmo_id.id,
2399                        length: 1,
2400                        ..Default::default()
2401                    })
2402                    .await
2403                    .unwrap();
2404
2405                // Send an independent request to flush through the fifo.
2406                writer
2407                    .write_entries(&BlockFifoRequest {
2408                        command: BlockFifoCommand {
2409                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2410                            ..Default::default()
2411                        },
2412                        reqid: 3,
2413                        vmoid: vmo_id.id,
2414                        ..Default::default()
2415                    })
2416                    .await
2417                    .unwrap();
2418
2419                // It should succeed.
2420                let mut response = BlockFifoResponse::default();
2421                reader.read_entries(&mut response).await.unwrap();
2422                assert_eq!(response.status, zx::sys::ZX_OK);
2423                assert_eq!(response.reqid, 3);
2424
2425                // Now release the original request.
2426                tx.send(()).unwrap();
2427
2428                // The response should be for the first message tagged as last, and it should be
2429                // an error because we sent two messages with the LAST marker.
2430                let mut response = BlockFifoResponse::default();
2431                reader.read_entries(&mut response).await.unwrap();
2432                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2433                assert_eq!(response.reqid, 1);
2434                assert_eq!(response.group, 1);
2435            }
2436        );
2437    }
2438
2439    #[fuchsia::test(allow_stalls = false)]
2440    async fn test_requests_dont_block_sessions() {
2441        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2442
2443        let (tx, rx) = oneshot::channel();
2444
2445        fasync::Task::local(async move {
2446            let rx = Mutex::new(Some(rx));
2447            let block_server = BlockServer::new(
2448                BLOCK_SIZE,
2449                Arc::new(MockInterface {
2450                    read_hook: Some(Box::new(move |_, _, _, _| {
2451                        let rx = rx.lock().take().unwrap();
2452                        Box::pin(async {
2453                            let _ = rx.await;
2454                            Ok(())
2455                        })
2456                    })),
2457                    ..MockInterface::default()
2458                }),
2459            );
2460            block_server.handle_requests(stream).await.unwrap();
2461        })
2462        .detach();
2463
2464        let mut fut = pin!(async {
2465            let (session_proxy, server) = fidl::endpoints::create_proxy();
2466
2467            proxy.open_session(server).unwrap();
2468
2469            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2470            let vmo_id = session_proxy
2471                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2472                .await
2473                .unwrap()
2474                .unwrap();
2475
2476            let mut fifo =
2477                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2478            let (mut reader, mut writer) = fifo.async_io();
2479
2480            writer
2481                .write_entries(&BlockFifoRequest {
2482                    command: BlockFifoCommand {
2483                        opcode: BlockOpcode::Read.into_primitive(),
2484                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2485                        ..Default::default()
2486                    },
2487                    reqid: 1,
2488                    group: 1,
2489                    vmoid: vmo_id.id,
2490                    length: 1,
2491                    ..Default::default()
2492                })
2493                .await
2494                .unwrap();
2495
2496            let mut response = BlockFifoResponse::default();
2497            reader.read_entries(&mut response).await.unwrap();
2498            assert_eq!(response.status, zx::sys::ZX_OK);
2499        });
2500
2501        // The response won't come back until we send on `tx`.
2502        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2503
2504        let mut fut2 = pin!(proxy.get_volume_info());
2505
2506        // get_volume_info is set up to stall forever.
2507        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2508
2509        // If we now free up the first future, it should resolve; the stalled call to
2510        // get_volume_info should not block the fifo response.
2511        let _ = tx.send(());
2512
2513        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2514    }
2515
2516    #[fuchsia::test]
2517    async fn test_request_flow_control() {
2518        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2519
2520        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2521        // server will block until that happens.
2522        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2523        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2524        let event_clone = event.clone();
2525        futures::join!(
2526            async move {
2527                let block_server = BlockServer::new(
2528                    BLOCK_SIZE,
2529                    Arc::new(MockInterface {
2530                        read_hook: Some(Box::new(move |_, _, _, _| {
2531                            let event_clone = event_clone.clone();
2532                            Box::pin(async move {
2533                                if !event_clone.1.load(Ordering::SeqCst) {
2534                                    event_clone.0.listen().await;
2535                                }
2536                                Ok(())
2537                            })
2538                        })),
2539                        ..MockInterface::default()
2540                    }),
2541                );
2542                block_server.handle_requests(stream).await.unwrap();
2543            },
2544            async move {
2545                let (session_proxy, server) = fidl::endpoints::create_proxy();
2546
2547                proxy.open_session(server).unwrap();
2548
2549                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2550                let vmo_id = session_proxy
2551                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2552                    .await
2553                    .unwrap()
2554                    .unwrap();
2555
2556                let mut fifo =
2557                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2558                let (mut reader, mut writer) = fifo.async_io();
2559
2560                for i in 0..MAX_REQUESTS {
2561                    writer
2562                        .write_entries(&BlockFifoRequest {
2563                            command: BlockFifoCommand {
2564                                opcode: BlockOpcode::Read.into_primitive(),
2565                                ..Default::default()
2566                            },
2567                            reqid: (i + 1) as u32,
2568                            dev_offset: i,
2569                            vmoid: vmo_id.id,
2570                            length: 1,
2571                            ..Default::default()
2572                        })
2573                        .await
2574                        .unwrap();
2575                }
2576                assert!(
2577                    futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2578                        command: BlockFifoCommand {
2579                            opcode: BlockOpcode::Read.into_primitive(),
2580                            ..Default::default()
2581                        },
2582                        reqid: u32::MAX,
2583                        dev_offset: MAX_REQUESTS,
2584                        vmoid: vmo_id.id,
2585                        length: 1,
2586                        ..Default::default()
2587                    })))
2588                    .is_pending()
2589                );
2590                // OK, let the server start to process.
2591                event.1.store(true, Ordering::SeqCst);
2592                event.0.notify(usize::MAX);
2593                // For each entry we read, make sure we can write a new one in.
2594                let mut finished_reqids = vec![];
2595                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2596                    let mut response = BlockFifoResponse::default();
2597                    reader.read_entries(&mut response).await.unwrap();
2598                    assert_eq!(response.status, zx::sys::ZX_OK);
2599                    finished_reqids.push(response.reqid);
2600                    writer
2601                        .write_entries(&BlockFifoRequest {
2602                            command: BlockFifoCommand {
2603                                opcode: BlockOpcode::Read.into_primitive(),
2604                                ..Default::default()
2605                            },
2606                            reqid: (i + 1) as u32,
2607                            dev_offset: i,
2608                            vmoid: vmo_id.id,
2609                            length: 1,
2610                            ..Default::default()
2611                        })
2612                        .await
2613                        .unwrap();
2614                }
2615                let mut response = BlockFifoResponse::default();
2616                for _ in 0..MAX_REQUESTS {
2617                    reader.read_entries(&mut response).await.unwrap();
2618                    assert_eq!(response.status, zx::sys::ZX_OK);
2619                    finished_reqids.push(response.reqid);
2620                }
2621                // Verify that we got a response for each request.  Note that we can't assume FIFO
2622                // ordering.
2623                finished_reqids.sort();
2624                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2625                let mut i = 1;
2626                for reqid in finished_reqids {
2627                    assert_eq!(reqid, i);
2628                    i += 1;
2629                }
2630            }
2631        );
2632    }
2633
2634    #[fuchsia::test]
2635    async fn test_passthrough_io_with_fixed_map() {
2636        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2637
2638        let expected_op = Arc::new(Mutex::new(None));
2639        let expected_op_clone = expected_op.clone();
2640        futures::join!(
2641            async {
2642                let block_server = BlockServer::new(
2643                    BLOCK_SIZE,
2644                    Arc::new(IoMockInterface {
2645                        return_errors: false,
2646                        do_checks: true,
2647                        expected_op: expected_op_clone,
2648                    }),
2649                );
2650                block_server.handle_requests(stream).await.unwrap();
2651            },
2652            async move {
2653                let (session_proxy, server) = fidl::endpoints::create_proxy();
2654
2655                let mapping = fblock::BlockOffsetMapping {
2656                    source_block_offset: 0,
2657                    target_block_offset: 10,
2658                    length: 20,
2659                };
2660                proxy.open_session_with_offset_map(server, &mapping).unwrap();
2661
2662                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2663                let vmo_id = session_proxy
2664                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2665                    .await
2666                    .unwrap()
2667                    .unwrap();
2668
2669                let mut fifo =
2670                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2671                let (mut reader, mut writer) = fifo.async_io();
2672
2673                // READ
2674                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2675                writer
2676                    .write_entries(&BlockFifoRequest {
2677                        command: BlockFifoCommand {
2678                            opcode: BlockOpcode::Read.into_primitive(),
2679                            ..Default::default()
2680                        },
2681                        vmoid: vmo_id.id,
2682                        dev_offset: 1,
2683                        length: 2,
2684                        vmo_offset: 3,
2685                        ..Default::default()
2686                    })
2687                    .await
2688                    .unwrap();
2689
2690                let mut response = BlockFifoResponse::default();
2691                reader.read_entries(&mut response).await.unwrap();
2692                assert_eq!(response.status, zx::sys::ZX_OK);
2693
2694                // WRITE
2695                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2696                writer
2697                    .write_entries(&BlockFifoRequest {
2698                        command: BlockFifoCommand {
2699                            opcode: BlockOpcode::Write.into_primitive(),
2700                            ..Default::default()
2701                        },
2702                        vmoid: vmo_id.id,
2703                        dev_offset: 4,
2704                        length: 5,
2705                        vmo_offset: 6,
2706                        ..Default::default()
2707                    })
2708                    .await
2709                    .unwrap();
2710
2711                reader.read_entries(&mut response).await.unwrap();
2712                assert_eq!(response.status, zx::sys::ZX_OK);
2713
2714                // FLUSH
2715                *expected_op.lock() = Some(ExpectedOp::Flush);
2716                writer
2717                    .write_entries(&BlockFifoRequest {
2718                        command: BlockFifoCommand {
2719                            opcode: BlockOpcode::Flush.into_primitive(),
2720                            ..Default::default()
2721                        },
2722                        ..Default::default()
2723                    })
2724                    .await
2725                    .unwrap();
2726
2727                reader.read_entries(&mut response).await.unwrap();
2728                assert_eq!(response.status, zx::sys::ZX_OK);
2729
2730                // TRIM
2731                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2732                writer
2733                    .write_entries(&BlockFifoRequest {
2734                        command: BlockFifoCommand {
2735                            opcode: BlockOpcode::Trim.into_primitive(),
2736                            ..Default::default()
2737                        },
2738                        dev_offset: 7,
2739                        length: 3,
2740                        ..Default::default()
2741                    })
2742                    .await
2743                    .unwrap();
2744
2745                reader.read_entries(&mut response).await.unwrap();
2746                assert_eq!(response.status, zx::sys::ZX_OK);
2747
2748                // READ past window
2749                *expected_op.lock() = None;
2750                writer
2751                    .write_entries(&BlockFifoRequest {
2752                        command: BlockFifoCommand {
2753                            opcode: BlockOpcode::Read.into_primitive(),
2754                            ..Default::default()
2755                        },
2756                        vmoid: vmo_id.id,
2757                        dev_offset: 19,
2758                        length: 2,
2759                        vmo_offset: 3,
2760                        ..Default::default()
2761                    })
2762                    .await
2763                    .unwrap();
2764
2765                reader.read_entries(&mut response).await.unwrap();
2766                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2767
2768                std::mem::drop(proxy);
2769            }
2770        );
2771    }
2772
2773    #[fuchsia::test]
2774    fn operation_map() {
2775        fn expect_map_result(
2776            mut operation: Operation,
2777            mapping: Option<BlockOffsetMapping>,
2778            max_blocks: Option<NonZero<u32>>,
2779            expected_operations: Vec<Operation>,
2780        ) {
2781            let mut ops = vec![];
2782            while let Some(remainder) = operation.map(mapping.as_ref(), max_blocks.clone(), 512) {
2783                ops.push(operation);
2784                operation = remainder;
2785            }
2786            ops.push(operation);
2787            assert_eq!(ops, expected_operations);
2788        }
2789
2790        // No limits
2791        expect_map_result(
2792            Operation::Read {
2793                device_block_offset: 10,
2794                block_count: 200,
2795                _unused: 0,
2796                vmo_offset: 0,
2797            },
2798            None,
2799            None,
2800            vec![Operation::Read {
2801                device_block_offset: 10,
2802                block_count: 200,
2803                _unused: 0,
2804                vmo_offset: 0,
2805            }],
2806        );
2807
2808        // Max block count
2809        expect_map_result(
2810            Operation::Read {
2811                device_block_offset: 10,
2812                block_count: 200,
2813                _unused: 0,
2814                vmo_offset: 0,
2815            },
2816            None,
2817            NonZero::new(120),
2818            vec![
2819                Operation::Read {
2820                    device_block_offset: 10,
2821                    block_count: 120,
2822                    _unused: 0,
2823                    vmo_offset: 0,
2824                },
2825                Operation::Read {
2826                    device_block_offset: 130,
2827                    block_count: 80,
2828                    _unused: 0,
2829                    vmo_offset: 120 * 512,
2830                },
2831            ],
2832        );
2833        expect_map_result(
2834            Operation::Write {
2835                device_block_offset: 10,
2836                block_count: 200,
2837                _unused: 0,
2838                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2839                vmo_offset: 0,
2840            },
2841            None,
2842            NonZero::new(120),
2843            vec![
2844                Operation::Write {
2845                    device_block_offset: 10,
2846                    block_count: 120,
2847                    _unused: 0,
2848                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2849                    vmo_offset: 0,
2850                },
2851                Operation::Write {
2852                    device_block_offset: 130,
2853                    block_count: 80,
2854                    _unused: 0,
2855                    options: WriteOptions::default(),
2856                    vmo_offset: 120 * 512,
2857                },
2858            ],
2859        );
2860        expect_map_result(
2861            Operation::Trim { device_block_offset: 10, block_count: 200 },
2862            None,
2863            NonZero::new(120),
2864            vec![Operation::Trim { device_block_offset: 10, block_count: 200 }],
2865        );
2866
2867        // Remapping + Max block count
2868        expect_map_result(
2869            Operation::Read {
2870                device_block_offset: 10,
2871                block_count: 200,
2872                _unused: 0,
2873                vmo_offset: 0,
2874            },
2875            Some(BlockOffsetMapping {
2876                source_block_offset: 10,
2877                target_block_offset: 100,
2878                length: 200,
2879            }),
2880            NonZero::new(120),
2881            vec![
2882                Operation::Read {
2883                    device_block_offset: 100,
2884                    block_count: 120,
2885                    _unused: 0,
2886                    vmo_offset: 0,
2887                },
2888                Operation::Read {
2889                    device_block_offset: 220,
2890                    block_count: 80,
2891                    _unused: 0,
2892                    vmo_offset: 120 * 512,
2893                },
2894            ],
2895        );
2896        expect_map_result(
2897            Operation::Write {
2898                device_block_offset: 10,
2899                block_count: 200,
2900                _unused: 0,
2901                options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2902                vmo_offset: 0,
2903            },
2904            Some(BlockOffsetMapping {
2905                source_block_offset: 10,
2906                target_block_offset: 100,
2907                length: 200,
2908            }),
2909            NonZero::new(120),
2910            vec![
2911                Operation::Write {
2912                    device_block_offset: 100,
2913                    block_count: 120,
2914                    _unused: 0,
2915                    options: WriteOptions { flags: WriteFlags::PRE_BARRIER, ..Default::default() },
2916                    vmo_offset: 0,
2917                },
2918                Operation::Write {
2919                    device_block_offset: 220,
2920                    block_count: 80,
2921                    _unused: 0,
2922                    options: WriteOptions::default(),
2923                    vmo_offset: 120 * 512,
2924                },
2925            ],
2926        );
2927        expect_map_result(
2928            Operation::Trim { device_block_offset: 10, block_count: 200 },
2929            Some(BlockOffsetMapping {
2930                source_block_offset: 10,
2931                target_block_offset: 100,
2932                length: 200,
2933            }),
2934            NonZero::new(120),
2935            vec![Operation::Trim { device_block_offset: 100, block_count: 200 }],
2936        );
2937    }
2938}