block_server/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
7use futures::{Future, FutureExt as _, TryStreamExt as _};
8use std::borrow::Cow;
9use std::collections::btree_map::Entry;
10use std::collections::BTreeMap;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::AtomicU64;
14use std::sync::{Arc, Mutex};
15use zx::HandleBased;
16use {
17    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
18    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
19};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[derive(Clone)]
25pub enum DeviceInfo {
26    Block(BlockInfo),
27    Partition(PartitionInfo),
28}
29
30/// Information associated with non-partition block devices.
31#[derive(Clone)]
32pub struct BlockInfo {
33    pub device_flags: fblock::Flag,
34    pub block_count: u64,
35}
36
37/// Information associated with a block device that is also a partition.
38#[derive(Clone)]
39pub struct PartitionInfo {
40    /// The device flags reported by the underlying device.
41    pub device_flags: fblock::Flag,
42    /// If `block_range` is None, the partition is a volume and may not be contiguous.
43    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
44    /// slices and use that (along with the slice and block sizes) to determine the block count.
45    pub block_range: Option<Range<u64>>,
46    pub type_guid: [u8; 16],
47    pub instance_guid: [u8; 16],
48    pub name: String,
49    pub flags: u64,
50}
51
52// Multiple Block I/O request may be sent as a group.
53// Notes:
54// - the group is identified by the group id in the request
55// - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
56//   flag is set.
57// - when processing a request of a group fails, subsequent requests of that
58//   group will not be processed.
59//
60// Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
61//
62// FifoMessageGroup keeps track of the relevant BlockFifoResponse field for
63// a group requests. Only `status` and `count` needs to be updated.
64struct FifoMessageGroup {
65    status: zx::Status,
66    count: u32,
67    req_id: Option<u32>,
68}
69
70impl FifoMessageGroup {
71    fn new() -> Self {
72        FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
73    }
74}
75
76#[derive(Default)]
77struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
78
79// Keeps track of all the group requests that are currently being processed
80impl FifoMessageGroups {
81    /// Completes a request and returns a response to be sent if it's the last outstanding request
82    /// for this group.
83    fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
84        let mut map = self.0.lock().unwrap();
85        let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
86        let group = o.get_mut();
87        if group.count == 1 {
88            if let Some(reqid) = group.req_id {
89                let status =
90                    if group.status != zx::Status::OK { group.status } else { status }.into_raw();
91
92                o.remove();
93
94                return Some(BlockFifoResponse {
95                    status,
96                    reqid,
97                    group: group_id,
98                    ..Default::default()
99                });
100            }
101        }
102
103        group.count = group.count.checked_sub(1).unwrap();
104        if status != zx::Status::OK && group.status == zx::Status::OK {
105            group.status = status
106        }
107        None
108    }
109}
110
111/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
112/// cbindgen:no-export
113pub struct BlockServer<SM> {
114    block_size: u32,
115    session_manager: Arc<SM>,
116}
117
118/// A single entry in `[OffsetMap]`.
119pub struct BlockOffsetMapping {
120    source_block_offset: u64,
121    target_block_offset: u64,
122    length: u64,
123}
124
125impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
126    type Error = zx::Status;
127
128    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
129        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
130        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
131        Ok(Self {
132            source_block_offset: wire.source_block_offset,
133            target_block_offset: wire.target_block_offset,
134            length: wire.length,
135        })
136    }
137}
138
139/// Remaps the offset of block requests based on an internal map.
140/// TODO(https://fxbug.dev/402515764): For now, this just supports a single entry in the map, which
141/// is all that is required for GPT partitions.  If we want to support this for FVM, we will need
142/// to support multiple entries, which requires changing the block server to support request
143/// splitting.
144pub struct OffsetMap {
145    mapping: BlockOffsetMapping,
146}
147
148impl OffsetMap {
149    pub fn new(mapping: BlockOffsetMapping) -> Self {
150        Self { mapping }
151    }
152
153    /// Adjusts the requested range, returning the new dev_offset.
154    /// Returns false if the request would exceed the range known to OffsetMap.
155    pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
156        if length == 0 {
157            return None;
158        }
159        let end = dev_offset.checked_add(length)?;
160        if self.mapping.source_block_offset > dev_offset
161            || end > self.mapping.source_block_offset + self.mapping.length
162        {
163            return None;
164        }
165        let delta = dev_offset - self.mapping.source_block_offset;
166        Some(self.mapping.target_block_offset + delta)
167    }
168}
169
170// Methods take Arc<Self> rather than &self because of
171// https://github.com/rust-lang/rust/issues/42940.
172pub trait SessionManager: 'static {
173    fn on_attach_vmo(
174        self: Arc<Self>,
175        vmo: &Arc<zx::Vmo>,
176    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
177
178    /// Creates a new session to handle `stream`.
179    /// The returned future should run until the session completes, for example when the client end
180    /// closes.
181    /// If `offset_map` is set, it will be used to remap the dev_offset of FIFO requests.
182    fn open_session(
183        self: Arc<Self>,
184        stream: fblock::SessionRequestStream,
185        offset_map: Option<OffsetMap>,
186        block_size: u32,
187    ) -> impl Future<Output = Result<(), Error>> + Send;
188
189    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
190    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
191
192    /// Called to handle the GetVolumeInfo FIDL call.
193    fn get_volume_info(
194        &self,
195    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
196    {
197        async { Err(zx::Status::NOT_SUPPORTED) }
198    }
199
200    /// Called to handle the QuerySlices FIDL call.
201    fn query_slices(
202        &self,
203        _start_slices: &[u64],
204    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
205        async { Err(zx::Status::NOT_SUPPORTED) }
206    }
207
208    /// Called to handle the Shrink FIDL call.
209    fn extend(
210        &self,
211        _start_slice: u64,
212        _slice_count: u64,
213    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
214        async { Err(zx::Status::NOT_SUPPORTED) }
215    }
216
217    /// Called to handle the Shrink FIDL call.
218    fn shrink(
219        &self,
220        _start_slice: u64,
221        _slice_count: u64,
222    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
223        async { Err(zx::Status::NOT_SUPPORTED) }
224    }
225}
226
227pub trait IntoSessionManager {
228    type SM: SessionManager;
229
230    fn into_session_manager(self) -> Arc<Self::SM>;
231}
232
233impl<SM: SessionManager> BlockServer<SM> {
234    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
235        Self { block_size, session_manager: session_manager.into_session_manager() }
236    }
237
238    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
239    pub async fn handle_requests(
240        &self,
241        mut requests: fvolume::VolumeRequestStream,
242    ) -> Result<(), Error> {
243        let scope = fasync::Scope::new();
244        while let Some(request) = requests.try_next().await.unwrap() {
245            if let Some(session) = self.handle_request(request).await? {
246                scope.spawn(session.map(|_| ()));
247            }
248        }
249        scope.await;
250        Ok(())
251    }
252
253    /// Processes a partition request.  If a new session task is created in response to the request,
254    /// it is returned.
255    async fn handle_request(
256        &self,
257        request: fvolume::VolumeRequest,
258    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
259        match request {
260            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
261                Ok(info) => {
262                    let (block_count, flags) = match info.as_ref() {
263                        DeviceInfo::Block(BlockInfo { block_count, device_flags }) => {
264                            (*block_count, *device_flags)
265                        }
266                        DeviceInfo::Partition(partition_info) => {
267                            let block_count = if let Some(range) =
268                                partition_info.block_range.as_ref()
269                            {
270                                range.end - range.start
271                            } else {
272                                let volume_info = self.session_manager.get_volume_info().await?;
273                                volume_info.0.slice_size * volume_info.1.partition_slice_count
274                                    / self.block_size as u64
275                            };
276                            (block_count, partition_info.device_flags)
277                        }
278                    };
279                    responder.send(Ok(&fblock::BlockInfo {
280                        block_count,
281                        block_size: self.block_size,
282                        max_transfer_size: fblock::MAX_TRANSFER_UNBOUNDED,
283                        flags,
284                    }))?;
285                }
286                Err(status) => responder.send(Err(status.into_raw()))?,
287            },
288            fvolume::VolumeRequest::GetStats { clear: _, responder } => {
289                // TODO(https://fxbug.dev/348077960): Implement this
290                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
291            }
292            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
293                return Ok(Some(self.session_manager.clone().open_session(
294                    session.into_stream(),
295                    None,
296                    self.block_size,
297                )));
298            }
299            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
300                session,
301                offset_map,
302                initial_mappings,
303                control_handle: _,
304            } => {
305                if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
306                    // TODO(https://fxbug.dev/402515764): Support multiple mappings and
307                    // dynamic querying for FVM as needed.  A single static map is
308                    // sufficient for GPT.
309                    session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
310                    return Ok(None);
311                }
312                let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
313                    Ok(m) => m,
314                    Err(status) => {
315                        session.close_with_epitaph(status)?;
316                        return Ok(None);
317                    }
318                };
319                let offset_map = OffsetMap::new(initial_mapping);
320                return Ok(Some(self.session_manager.clone().open_session(
321                    session.into_stream(),
322                    Some(offset_map),
323                    self.block_size,
324                )));
325            }
326            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
327                Ok(info) => {
328                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
329                        let mut guid =
330                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
331                        guid.value.copy_from_slice(&partition_info.type_guid);
332                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
333                    } else {
334                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
335                    }
336                }
337                Err(status) => {
338                    responder.send(status.into_raw(), None)?;
339                }
340            },
341            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
342                match self.device_info().await {
343                    Ok(info) => {
344                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
345                            let mut guid =
346                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
347                            guid.value.copy_from_slice(&partition_info.instance_guid);
348                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
349                        } else {
350                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
351                        }
352                    }
353                    Err(status) => {
354                        responder.send(status.into_raw(), None)?;
355                    }
356                }
357            }
358            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
359                Ok(info) => {
360                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
361                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
362                    } else {
363                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
364                    }
365                }
366                Err(status) => {
367                    responder.send(status.into_raw(), None)?;
368                }
369            },
370            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
371                Ok(info) => {
372                    if let DeviceInfo::Partition(info) = info.as_ref() {
373                        let mut type_guid =
374                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
375                        type_guid.value.copy_from_slice(&info.type_guid);
376                        let mut instance_guid =
377                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
378                        instance_guid.value.copy_from_slice(&info.instance_guid);
379                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
380                            name: Some(info.name.clone()),
381                            type_guid: Some(type_guid),
382                            instance_guid: Some(instance_guid),
383                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
384                            num_blocks: info
385                                .block_range
386                                .as_ref()
387                                .map(|range| range.end - range.start),
388                            flags: Some(info.flags),
389                            ..Default::default()
390                        }))?;
391                    }
392                }
393                Err(status) => responder.send(Err(status.into_raw()))?,
394            },
395            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
396                match self.session_manager.query_slices(&start_slices).await {
397                    Ok(mut results) => {
398                        let results_len = results.len();
399                        assert!(results_len <= 16);
400                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
401                        responder.send(
402                            zx::sys::ZX_OK,
403                            &results.try_into().unwrap(),
404                            results_len as u64,
405                        )?;
406                    }
407                    Err(s) => {
408                        responder.send(
409                            s.into_raw(),
410                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
411                            0,
412                        )?;
413                    }
414                }
415            }
416            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
417                match self.session_manager.get_volume_info().await {
418                    Ok((manager_info, volume_info)) => {
419                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
420                    }
421                    Err(s) => responder.send(s.into_raw(), None, None)?,
422                }
423            }
424            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
425                responder.send(
426                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
427                        .into_raw(),
428                )?;
429            }
430            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
431                responder.send(
432                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
433                        .into_raw(),
434                )?;
435            }
436            fvolume::VolumeRequest::Destroy { responder, .. } => {
437                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
438            }
439        }
440        Ok(None)
441    }
442
443    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
444        self.session_manager.get_info().await
445    }
446}
447
448struct SessionHelper<SM: SessionManager> {
449    session_manager: Arc<SM>,
450    offset_map: Option<OffsetMap>,
451    block_size: u32,
452    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
453    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
454    message_groups: FifoMessageGroups,
455}
456
457impl<SM: SessionManager> SessionHelper<SM> {
458    fn new(
459        session_manager: Arc<SM>,
460        offset_map: Option<OffsetMap>,
461        block_size: u32,
462    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
463        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
464        Ok((
465            Self {
466                session_manager,
467                offset_map,
468                block_size,
469                peer_fifo,
470                vmos: Mutex::default(),
471                message_groups: FifoMessageGroups::default(),
472            },
473            fifo,
474        ))
475    }
476
477    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
478        match request {
479            fblock::SessionRequest::GetFifo { responder } => {
480                let rights = zx::Rights::TRANSFER
481                    | zx::Rights::READ
482                    | zx::Rights::WRITE
483                    | zx::Rights::SIGNAL
484                    | zx::Rights::WAIT;
485                match self.peer_fifo.duplicate_handle(rights) {
486                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
487                    Err(s) => responder.send(Err(s.into_raw()))?,
488                }
489                Ok(())
490            }
491            fblock::SessionRequest::AttachVmo { vmo, responder } => {
492                let vmo = Arc::new(vmo);
493                let vmo_id = {
494                    let mut vmos = self.vmos.lock().unwrap();
495                    if vmos.len() == u16::MAX as usize {
496                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
497                        return Ok(());
498                    } else {
499                        let vmo_id = match vmos.last_entry() {
500                            None => 1,
501                            Some(o) => {
502                                o.key().checked_add(1).unwrap_or_else(|| {
503                                    let mut vmo_id = 1;
504                                    // Find the first gap...
505                                    for (&id, _) in &*vmos {
506                                        if id > vmo_id {
507                                            break;
508                                        }
509                                        vmo_id = id + 1;
510                                    }
511                                    vmo_id
512                                })
513                            }
514                        };
515                        vmos.insert(vmo_id, vmo.clone());
516                        vmo_id
517                    }
518                };
519                self.session_manager.clone().on_attach_vmo(&vmo).await?;
520                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
521                Ok(())
522            }
523            fblock::SessionRequest::Close { responder } => {
524                responder.send(Ok(()))?;
525                Err(anyhow!("Closed"))
526            }
527        }
528    }
529
530    fn finish_fifo_request(
531        &self,
532        request: RequestTracking,
533        status: zx::Status,
534    ) -> Option<BlockFifoResponse> {
535        match request.group_or_request {
536            GroupOrRequest::Group(group_id) => {
537                let response = self.message_groups.complete(group_id, status);
538                fuchsia_trace::duration!(
539                    c"storage",
540                    c"block_server::finish_transaction_in_group",
541                    "group" => u32::from(group_id),
542                    "group_completed" => response.is_some(),
543                    "status" => status.into_raw());
544                if let Some(trace_flow_id) = request.trace_flow_id {
545                    fuchsia_trace::flow_step!(
546                        c"storage",
547                        c"block_server::finish_transaction",
548                        trace_flow_id.get().into()
549                    );
550                }
551                response
552            }
553            GroupOrRequest::Request(reqid) => {
554                fuchsia_trace::duration!(
555                    c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
556                if let Some(trace_flow_id) = request.trace_flow_id {
557                    fuchsia_trace::flow_step!(
558                        c"storage",
559                        c"block_server::finish_transaction",
560                        trace_flow_id.get().into()
561                    );
562                }
563                Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
564            }
565        }
566    }
567
568    fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
569        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
570        let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
571        let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
572        let mut op_code =
573            BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
574
575        let group_or_request = if is_group {
576            let mut groups = self.message_groups.0.lock().unwrap();
577            let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
578            if group.req_id.is_some() {
579                // We have already received a request tagged as last.
580                if group.status == zx::Status::OK {
581                    group.status = zx::Status::INVALID_ARGS;
582                }
583                return None;
584            }
585            if last_in_group {
586                group.req_id = Some(request.reqid);
587                // If the group has had an error, there is no point trying to issue this request.
588                if group.status != zx::Status::OK {
589                    op_code = Err(group.status);
590                }
591            } else if group.status != zx::Status::OK {
592                // The group has already encountered an error, so there is no point trying to issue
593                // this request.
594                return None;
595            }
596            group.count += 1;
597            GroupOrRequest::Group(request.group)
598        } else {
599            GroupOrRequest::Request(request.reqid)
600        };
601
602        let mut vmo_offset = 0;
603        let vmo = match op_code {
604            Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
605                if request.length == 0 {
606                    return Err(zx::Status::INVALID_ARGS);
607                }
608                vmo_offset = request
609                    .vmo_offset
610                    .checked_mul(self.block_size as u64)
611                    .ok_or(zx::Status::OUT_OF_RANGE)?;
612                self.vmos
613                    .lock()
614                    .unwrap()
615                    .get(&request.vmoid)
616                    .cloned()
617                    .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
618            })(),
619            Ok(BlockOpcode::CloseVmo) => self
620                .vmos
621                .lock()
622                .unwrap()
623                .remove(&request.vmoid)
624                .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
625            _ => Ok(None),
626        }
627        .unwrap_or_else(|e| {
628            op_code = Err(e);
629            None
630        });
631
632        let operation = op_code.map(|code| match code {
633            BlockOpcode::Read => Operation::Read {
634                device_block_offset: request.dev_offset,
635                block_count: request.length,
636                _unused: 0,
637                vmo_offset,
638            },
639            BlockOpcode::Write => Operation::Write {
640                device_block_offset: request.dev_offset,
641                block_count: request.length,
642                options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
643                    WriteOptions::FORCE_ACCESS
644                } else {
645                    WriteOptions::empty()
646                },
647                vmo_offset,
648            },
649            BlockOpcode::Flush => Operation::Flush,
650            BlockOpcode::Trim => Operation::Trim {
651                device_block_offset: request.dev_offset,
652                block_count: request.length,
653            },
654            BlockOpcode::CloseVmo => Operation::CloseVmo,
655        });
656        if let Ok(operation) = operation.as_ref() {
657            use fuchsia_trace::ArgValue;
658            static CACHE: AtomicU64 = AtomicU64::new(0);
659            if let Some(context) =
660                fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
661            {
662                let trace_args_with_group = [
663                    ArgValue::of("group", u32::from(group_or_request.group_id())),
664                    ArgValue::of("opcode", operation.trace_label()),
665                ];
666                let trace_args = [ArgValue::of("opcode", operation.trace_label())];
667                let _scope = if group_or_request.is_group() {
668                    fuchsia_trace::duration(
669                        c"storage",
670                        c"block_server::start_transaction",
671                        &trace_args_with_group,
672                    )
673                } else {
674                    fuchsia_trace::duration(
675                        c"storage",
676                        c"block_server::start_transaction",
677                        &trace_args,
678                    )
679                };
680                let trace_flow_id = NonZero::new(request.trace_flow_id);
681                if let Some(trace_flow_id) = trace_flow_id.clone() {
682                    fuchsia_trace::flow_step(
683                        &context,
684                        c"block_server::start_trnsaction",
685                        trace_flow_id.get().into(),
686                        &[],
687                    );
688                }
689            }
690        }
691        Some(DecodedRequest::new(
692            RequestTracking {
693                group_or_request,
694                trace_flow_id: NonZero::new(request.trace_flow_id),
695            },
696            operation,
697            vmo,
698            self.offset_map.as_ref(),
699        ))
700    }
701
702    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
703        std::mem::take(&mut *self.vmos.lock().unwrap())
704    }
705}
706
707#[derive(Debug)]
708struct DecodedRequest {
709    request_tracking: RequestTracking,
710    operation: Result<Operation, zx::Status>,
711    vmo: Option<Arc<zx::Vmo>>,
712}
713
714impl DecodedRequest {
715    fn new(
716        request_tracking: RequestTracking,
717        operation: Result<Operation, zx::Status>,
718        vmo: Option<Arc<zx::Vmo>>,
719        offset_map: Option<&OffsetMap>,
720    ) -> Self {
721        let operation =
722            operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
723        Self { request_tracking, operation, vmo }
724    }
725}
726
727/// cbindgen:no-export
728pub type WriteOptions = block_protocol::WriteOptions;
729
730#[repr(C)]
731#[derive(Debug)]
732pub enum Operation {
733    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
734    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
735    // code can read `device_block_offset` from the read variant and then assume it's valid for the
736    // write variant).
737    Read {
738        device_block_offset: u64,
739        block_count: u32,
740        _unused: u32,
741        vmo_offset: u64,
742    },
743    Write {
744        device_block_offset: u64,
745        block_count: u32,
746        options: WriteOptions,
747        vmo_offset: u64,
748    },
749    Flush,
750    Trim {
751        device_block_offset: u64,
752        block_count: u32,
753    },
754    /// This will never be seen by the C interface.
755    CloseVmo,
756}
757
758impl Operation {
759    // Adjusts the operation's block range via `OffsetMap`.  If the request would exceed the range
760    // known to the map, or is otherwise invalid, returns an error.
761    fn validate_and_adjust_range(
762        mut self,
763        offset_map: Option<&OffsetMap>,
764    ) -> Result<Operation, zx::Status> {
765        let adjust_offset = |dev_offset, length| {
766            // For compatibility with the C++ server, always ensure the length is nonzero
767            if length == 0 {
768                return Err(zx::Status::INVALID_ARGS);
769            }
770            if let Some(offset_map) = offset_map {
771                offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
772            } else {
773                Ok(dev_offset)
774            }
775        };
776        match &mut self {
777            Operation::Read { device_block_offset, block_count, .. } => {
778                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
779            }
780            Operation::Write { device_block_offset, block_count, .. } => {
781                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
782            }
783            Operation::Trim { device_block_offset, block_count, .. } => {
784                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
785            }
786            _ => {}
787        }
788        Ok(self)
789    }
790    fn trace_label(&self) -> &'static str {
791        match self {
792            Operation::Read { .. } => "read",
793            Operation::Write { .. } => "write",
794            Operation::Flush { .. } => "flush",
795            Operation::Trim { .. } => "trim",
796            Operation::CloseVmo { .. } => "close_vmo",
797        }
798    }
799}
800
801#[derive(Clone, Copy, Debug)]
802pub struct RequestTracking {
803    group_or_request: GroupOrRequest,
804    trace_flow_id: Option<NonZero<u64>>,
805}
806
807#[derive(Clone, Copy, Debug)]
808pub enum GroupOrRequest {
809    Group(u16),
810    Request(u32),
811}
812
813impl GroupOrRequest {
814    fn is_group(&self) -> bool {
815        if let Self::Group(_) = self {
816            true
817        } else {
818            false
819        }
820    }
821
822    fn group_id(&self) -> u16 {
823        match self {
824            Self::Group(id) => *id,
825            Self::Request(_) => 0,
826        }
827    }
828}
829
830/// cbindgen:ignore
831const IS_GROUP: u64 = 0x8000_0000_0000_0000;
832/// cbindgen:ignore
833const USED_VMO: u64 = 0x4000_0000_0000_0000;
834#[repr(transparent)]
835#[derive(Clone, Copy, Eq, PartialEq, Hash)]
836pub struct RequestId(u64);
837
838impl RequestId {
839    /// Marks the request as having used a VMO, so that we keep the VMO alive until
840    /// the request has finished.
841    fn with_vmo(self) -> Self {
842        Self(self.0 | USED_VMO)
843    }
844
845    /// Returns whether the request ID indicates a VMO was used.
846    fn did_have_vmo(&self) -> bool {
847        self.0 & USED_VMO != 0
848    }
849}
850
851impl From<GroupOrRequest> for RequestId {
852    fn from(value: GroupOrRequest) -> Self {
853        match value {
854            GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
855            GroupOrRequest::Request(request) => RequestId(request as u64),
856        }
857    }
858}
859
860impl From<RequestId> for GroupOrRequest {
861    fn from(value: RequestId) -> Self {
862        if value.0 & IS_GROUP == 0 {
863            GroupOrRequest::Request(value.0 as u32)
864        } else {
865            GroupOrRequest::Group(value.0 as u16)
866        }
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::{BlockServer, DeviceInfo, PartitionInfo};
873    use crate::async_interface::FIFO_MAX_REQUESTS;
874    use assert_matches::assert_matches;
875    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
876    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
877    use fuchsia_async::{FifoReadable as _, FifoWritable as _};
878    use futures::channel::oneshot;
879    use futures::future::BoxFuture;
880    use futures::FutureExt as _;
881    use std::borrow::Cow;
882    use std::future::poll_fn;
883    use std::num::NonZero;
884    use std::pin::pin;
885    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
886    use std::sync::{Arc, Mutex};
887    use std::task::{Context, Poll};
888    use zx::{AsHandleRef as _, HandleBased as _};
889    use {
890        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
891        fuchsia_async as fasync,
892    };
893
894    #[derive(Default)]
895    struct MockInterface {
896        read_hook: Option<
897            Box<
898                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
899                    + Send
900                    + Sync,
901            >,
902        >,
903    }
904
905    impl super::async_interface::Interface for MockInterface {
906        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
907            Ok(())
908        }
909
910        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
911            Ok(Cow::Owned(test_device_info()))
912        }
913
914        async fn read(
915            &self,
916            device_block_offset: u64,
917            block_count: u32,
918            vmo: &Arc<zx::Vmo>,
919            vmo_offset: u64,
920            _trace_flow_id: Option<NonZero<u64>>,
921        ) -> Result<(), zx::Status> {
922            if let Some(read_hook) = &self.read_hook {
923                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
924            } else {
925                unimplemented!();
926            }
927        }
928
929        async fn write(
930            &self,
931            _device_block_offset: u64,
932            _block_count: u32,
933            _vmo: &Arc<zx::Vmo>,
934            _vmo_offset: u64,
935            _opts: WriteOptions,
936            _trace_flow_id: Option<NonZero<u64>>,
937        ) -> Result<(), zx::Status> {
938            unreachable!();
939        }
940
941        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
942            unreachable!();
943        }
944
945        async fn trim(
946            &self,
947            _device_block_offset: u64,
948            _block_count: u32,
949            _trace_flow_id: Option<NonZero<u64>>,
950        ) -> Result<(), zx::Status> {
951            unreachable!();
952        }
953
954        async fn get_volume_info(
955            &self,
956        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
957            // Hang forever for the test_requests_dont_block_sessions test.
958            let () = std::future::pending().await;
959            unreachable!();
960        }
961    }
962
963    const BLOCK_SIZE: u32 = 512;
964
965    fn test_device_info() -> DeviceInfo {
966        DeviceInfo::Partition(PartitionInfo {
967            device_flags: fblock::Flag::READONLY,
968            block_range: Some(12..34),
969            type_guid: [1; 16],
970            instance_guid: [2; 16],
971            name: "foo".to_string(),
972            flags: 0xabcd,
973        })
974    }
975
976    #[fuchsia::test]
977    async fn test_info() {
978        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
979
980        futures::join!(
981            async {
982                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
983                block_server.handle_requests(stream).await.unwrap();
984            },
985            async {
986                let expected_info = test_device_info();
987                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
988                    info
989                } else {
990                    unreachable!()
991                };
992
993                let block_info = proxy.get_info().await.unwrap().unwrap();
994                assert_eq!(
995                    block_info.block_count,
996                    partition_info.block_range.as_ref().unwrap().end
997                        - partition_info.block_range.as_ref().unwrap().start
998                );
999                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1000
1001                // TODO(https://fxbug.dev/348077960): Check max_transfer_size
1002
1003                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1004                assert_eq!(status, zx::sys::ZX_OK);
1005                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1006
1007                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1008                assert_eq!(status, zx::sys::ZX_OK);
1009                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1010
1011                let (status, name) = proxy.get_name().await.unwrap();
1012                assert_eq!(status, zx::sys::ZX_OK);
1013                assert_eq!(name.as_ref(), Some(&partition_info.name));
1014
1015                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1016                assert_eq!(metadata.name, name);
1017                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1018                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1019                assert_eq!(
1020                    metadata.start_block_offset,
1021                    Some(partition_info.block_range.as_ref().unwrap().start)
1022                );
1023                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1024                assert_eq!(metadata.flags, Some(partition_info.flags));
1025
1026                std::mem::drop(proxy);
1027            }
1028        );
1029    }
1030
1031    #[fuchsia::test]
1032    async fn test_attach_vmo() {
1033        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1034
1035        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1036        let koid = vmo.get_koid().unwrap();
1037
1038        futures::join!(
1039            async {
1040                let block_server = BlockServer::new(
1041                    BLOCK_SIZE,
1042                    Arc::new(MockInterface {
1043                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1044                            assert_eq!(vmo.get_koid().unwrap(), koid);
1045                            Box::pin(async { Ok(()) })
1046                        })),
1047                        ..MockInterface::default()
1048                    }),
1049                );
1050                block_server.handle_requests(stream).await.unwrap();
1051            },
1052            async move {
1053                let (session_proxy, server) = fidl::endpoints::create_proxy();
1054
1055                proxy.open_session(server).unwrap();
1056
1057                let vmo_id = session_proxy
1058                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1059                    .await
1060                    .unwrap()
1061                    .unwrap();
1062                assert_ne!(vmo_id.id, 0);
1063
1064                let fifo =
1065                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1066
1067                // Keep attaching VMOs until we eventually hit the maximum.
1068                let mut count = 1;
1069                loop {
1070                    match session_proxy
1071                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1072                        .await
1073                        .unwrap()
1074                    {
1075                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1076                        Err(e) => {
1077                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1078                            break;
1079                        }
1080                    }
1081
1082                    // Only test every 10 to keep test time down.
1083                    if count % 10 == 0 {
1084                        fifo.write_entries(&BlockFifoRequest {
1085                            command: BlockFifoCommand {
1086                                opcode: BlockOpcode::Read.into_primitive(),
1087                                ..Default::default()
1088                            },
1089                            vmoid: vmo_id.id,
1090                            length: 1,
1091                            ..Default::default()
1092                        })
1093                        .await
1094                        .unwrap();
1095
1096                        let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1097                        assert_eq!(response.status, zx::sys::ZX_OK);
1098                    }
1099
1100                    count += 1;
1101                }
1102
1103                assert_eq!(count, u16::MAX as u64);
1104
1105                // Detach the original VMO, and make sure we can then attach another one.
1106                fifo.write_entries(&BlockFifoRequest {
1107                    command: BlockFifoCommand {
1108                        opcode: BlockOpcode::CloseVmo.into_primitive(),
1109                        ..Default::default()
1110                    },
1111                    vmoid: vmo_id.id,
1112                    ..Default::default()
1113                })
1114                .await
1115                .unwrap();
1116
1117                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1118                assert_eq!(response.status, zx::sys::ZX_OK);
1119
1120                let new_vmo_id = session_proxy
1121                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1122                    .await
1123                    .unwrap()
1124                    .unwrap();
1125                // It should reuse the same ID.
1126                assert_eq!(new_vmo_id.id, vmo_id.id);
1127
1128                std::mem::drop(proxy);
1129            }
1130        );
1131    }
1132
1133    #[fuchsia::test]
1134    async fn test_close() {
1135        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1136
1137        let mut server = std::pin::pin!(async {
1138            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1139            block_server.handle_requests(stream).await.unwrap();
1140        }
1141        .fuse());
1142
1143        let mut client = std::pin::pin!(async {
1144            let (session_proxy, server) = fidl::endpoints::create_proxy();
1145
1146            proxy.open_session(server).unwrap();
1147
1148            // Dropping the proxy should not cause the session to terminate because the session is
1149            // still live.
1150            std::mem::drop(proxy);
1151
1152            session_proxy.close().await.unwrap().unwrap();
1153
1154            // Keep the session alive.  Calling `close` should cause the server to terminate.
1155            let _: () = std::future::pending().await;
1156        }
1157        .fuse());
1158
1159        futures::select!(
1160            _ = server => {}
1161            _ = client => unreachable!(),
1162        );
1163    }
1164
1165    #[derive(Default)]
1166    struct IoMockInterface {
1167        do_checks: bool,
1168        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1169        return_errors: bool,
1170    }
1171
1172    #[derive(Debug)]
1173    enum ExpectedOp {
1174        Read(u64, u32, u64),
1175        Write(u64, u32, u64),
1176        Trim(u64, u32),
1177        Flush,
1178    }
1179
1180    impl super::async_interface::Interface for IoMockInterface {
1181        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1182            Ok(())
1183        }
1184
1185        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1186            Ok(Cow::Owned(test_device_info()))
1187        }
1188
1189        async fn read(
1190            &self,
1191            device_block_offset: u64,
1192            block_count: u32,
1193            _vmo: &Arc<zx::Vmo>,
1194            vmo_offset: u64,
1195            _trace_flow_id: Option<NonZero<u64>>,
1196        ) -> Result<(), zx::Status> {
1197            if self.return_errors {
1198                Err(zx::Status::INTERNAL)
1199            } else {
1200                if self.do_checks {
1201                    assert_matches!(
1202                        self.expected_op.lock().unwrap().take(),
1203                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1204                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1205                        "Read {device_block_offset} {block_count} {vmo_offset}"
1206                    );
1207                }
1208                Ok(())
1209            }
1210        }
1211
1212        async fn write(
1213            &self,
1214            device_block_offset: u64,
1215            block_count: u32,
1216            _vmo: &Arc<zx::Vmo>,
1217            vmo_offset: u64,
1218            _opts: WriteOptions,
1219            _trace_flow_id: Option<NonZero<u64>>,
1220        ) -> Result<(), zx::Status> {
1221            if self.return_errors {
1222                Err(zx::Status::NOT_SUPPORTED)
1223            } else {
1224                if self.do_checks {
1225                    assert_matches!(
1226                        self.expected_op.lock().unwrap().take(),
1227                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1228                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1229                        "Write {device_block_offset} {block_count} {vmo_offset}"
1230                    );
1231                }
1232                Ok(())
1233            }
1234        }
1235
1236        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1237            if self.return_errors {
1238                Err(zx::Status::NO_RESOURCES)
1239            } else {
1240                if self.do_checks {
1241                    assert_matches!(
1242                        self.expected_op.lock().unwrap().take(),
1243                        Some(ExpectedOp::Flush)
1244                    );
1245                }
1246                Ok(())
1247            }
1248        }
1249
1250        async fn trim(
1251            &self,
1252            device_block_offset: u64,
1253            block_count: u32,
1254            _trace_flow_id: Option<NonZero<u64>>,
1255        ) -> Result<(), zx::Status> {
1256            if self.return_errors {
1257                Err(zx::Status::NO_MEMORY)
1258            } else {
1259                if self.do_checks {
1260                    assert_matches!(
1261                        self.expected_op.lock().unwrap().take(),
1262                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1263                            block_count == b,
1264                        "Trim {device_block_offset} {block_count}"
1265                    );
1266                }
1267                Ok(())
1268            }
1269        }
1270    }
1271
1272    #[fuchsia::test]
1273    async fn test_io() {
1274        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1275
1276        let expected_op = Arc::new(Mutex::new(None));
1277        let expected_op_clone = expected_op.clone();
1278        futures::join!(
1279            async {
1280                let block_server = BlockServer::new(
1281                    BLOCK_SIZE,
1282                    Arc::new(IoMockInterface {
1283                        return_errors: false,
1284                        do_checks: true,
1285                        expected_op: expected_op_clone,
1286                    }),
1287                );
1288                block_server.handle_requests(stream).await.unwrap();
1289            },
1290            async move {
1291                let (session_proxy, server) = fidl::endpoints::create_proxy();
1292
1293                proxy.open_session(server).unwrap();
1294
1295                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1296                let vmo_id = session_proxy
1297                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1298                    .await
1299                    .unwrap()
1300                    .unwrap();
1301
1302                let fifo =
1303                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1304
1305                // READ
1306                *expected_op.lock().unwrap() = Some(ExpectedOp::Read(1, 2, 3));
1307                fifo.write_entries(&BlockFifoRequest {
1308                    command: BlockFifoCommand {
1309                        opcode: BlockOpcode::Read.into_primitive(),
1310                        ..Default::default()
1311                    },
1312                    vmoid: vmo_id.id,
1313                    dev_offset: 1,
1314                    length: 2,
1315                    vmo_offset: 3,
1316                    ..Default::default()
1317                })
1318                .await
1319                .unwrap();
1320
1321                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1322                assert_eq!(response.status, zx::sys::ZX_OK);
1323
1324                // WRITE
1325                *expected_op.lock().unwrap() = Some(ExpectedOp::Write(4, 5, 6));
1326                fifo.write_entries(&BlockFifoRequest {
1327                    command: BlockFifoCommand {
1328                        opcode: BlockOpcode::Write.into_primitive(),
1329                        ..Default::default()
1330                    },
1331                    vmoid: vmo_id.id,
1332                    dev_offset: 4,
1333                    length: 5,
1334                    vmo_offset: 6,
1335                    ..Default::default()
1336                })
1337                .await
1338                .unwrap();
1339
1340                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1341                assert_eq!(response.status, zx::sys::ZX_OK);
1342
1343                // FLUSH
1344                *expected_op.lock().unwrap() = Some(ExpectedOp::Flush);
1345                fifo.write_entries(&BlockFifoRequest {
1346                    command: BlockFifoCommand {
1347                        opcode: BlockOpcode::Flush.into_primitive(),
1348                        ..Default::default()
1349                    },
1350                    ..Default::default()
1351                })
1352                .await
1353                .unwrap();
1354
1355                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1356                assert_eq!(response.status, zx::sys::ZX_OK);
1357
1358                // TRIM
1359                *expected_op.lock().unwrap() = Some(ExpectedOp::Trim(7, 8));
1360                fifo.write_entries(&BlockFifoRequest {
1361                    command: BlockFifoCommand {
1362                        opcode: BlockOpcode::Trim.into_primitive(),
1363                        ..Default::default()
1364                    },
1365                    dev_offset: 7,
1366                    length: 8,
1367                    ..Default::default()
1368                })
1369                .await
1370                .unwrap();
1371
1372                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1373                assert_eq!(response.status, zx::sys::ZX_OK);
1374
1375                std::mem::drop(proxy);
1376            }
1377        );
1378    }
1379
1380    #[fuchsia::test]
1381    async fn test_io_errors() {
1382        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1383
1384        futures::join!(
1385            async {
1386                let block_server = BlockServer::new(
1387                    BLOCK_SIZE,
1388                    Arc::new(IoMockInterface {
1389                        return_errors: true,
1390                        do_checks: false,
1391                        expected_op: Arc::new(Mutex::new(None)),
1392                    }),
1393                );
1394                block_server.handle_requests(stream).await.unwrap();
1395            },
1396            async move {
1397                let (session_proxy, server) = fidl::endpoints::create_proxy();
1398
1399                proxy.open_session(server).unwrap();
1400
1401                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1402                let vmo_id = session_proxy
1403                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1404                    .await
1405                    .unwrap()
1406                    .unwrap();
1407
1408                let fifo =
1409                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1410
1411                // READ
1412                fifo.write_entries(&BlockFifoRequest {
1413                    command: BlockFifoCommand {
1414                        opcode: BlockOpcode::Read.into_primitive(),
1415                        ..Default::default()
1416                    },
1417                    vmoid: vmo_id.id,
1418                    length: 1,
1419                    reqid: 1,
1420                    ..Default::default()
1421                })
1422                .await
1423                .unwrap();
1424
1425                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1426                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1427
1428                // WRITE
1429                fifo.write_entries(&BlockFifoRequest {
1430                    command: BlockFifoCommand {
1431                        opcode: BlockOpcode::Write.into_primitive(),
1432                        ..Default::default()
1433                    },
1434                    vmoid: vmo_id.id,
1435                    length: 1,
1436                    reqid: 2,
1437                    ..Default::default()
1438                })
1439                .await
1440                .unwrap();
1441
1442                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1443                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1444
1445                // FLUSH
1446                fifo.write_entries(&BlockFifoRequest {
1447                    command: BlockFifoCommand {
1448                        opcode: BlockOpcode::Flush.into_primitive(),
1449                        ..Default::default()
1450                    },
1451                    reqid: 3,
1452                    ..Default::default()
1453                })
1454                .await
1455                .unwrap();
1456
1457                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1458                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1459
1460                // TRIM
1461                fifo.write_entries(&BlockFifoRequest {
1462                    command: BlockFifoCommand {
1463                        opcode: BlockOpcode::Trim.into_primitive(),
1464                        ..Default::default()
1465                    },
1466                    reqid: 4,
1467                    length: 1,
1468                    ..Default::default()
1469                })
1470                .await
1471                .unwrap();
1472
1473                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1474                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1475
1476                std::mem::drop(proxy);
1477            }
1478        );
1479    }
1480
1481    #[fuchsia::test]
1482    async fn test_invalid_args() {
1483        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1484
1485        futures::join!(
1486            async {
1487                let block_server = BlockServer::new(
1488                    BLOCK_SIZE,
1489                    Arc::new(IoMockInterface {
1490                        return_errors: false,
1491                        do_checks: false,
1492                        expected_op: Arc::new(Mutex::new(None)),
1493                    }),
1494                );
1495                block_server.handle_requests(stream).await.unwrap();
1496            },
1497            async move {
1498                let (session_proxy, server) = fidl::endpoints::create_proxy();
1499
1500                proxy.open_session(server).unwrap();
1501
1502                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1503                let vmo_id = session_proxy
1504                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1505                    .await
1506                    .unwrap()
1507                    .unwrap();
1508
1509                let fifo =
1510                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1511
1512                async fn test(
1513                    fifo: &fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1514                    request: BlockFifoRequest,
1515                ) -> Result<(), zx::Status> {
1516                    fifo.write_entries(&request).await.unwrap();
1517                    let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1518                    zx::Status::ok(response.status)
1519                }
1520
1521                // READ
1522
1523                let good_read_request = || BlockFifoRequest {
1524                    command: BlockFifoCommand {
1525                        opcode: BlockOpcode::Read.into_primitive(),
1526                        ..Default::default()
1527                    },
1528                    vmoid: vmo_id.id,
1529                    ..Default::default()
1530                };
1531
1532                assert_eq!(
1533                    test(&fifo, BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() })
1534                        .await,
1535                    Err(zx::Status::INVALID_ARGS)
1536                );
1537
1538                assert_eq!(
1539                    test(
1540                        &fifo,
1541                        BlockFifoRequest {
1542                            vmo_offset: 0xffff_ffff_ffff_ffff,
1543                            ..good_read_request()
1544                        }
1545                    )
1546                    .await,
1547                    Err(zx::Status::INVALID_ARGS)
1548                );
1549
1550                // WRITE
1551
1552                let good_write_request = || BlockFifoRequest {
1553                    command: BlockFifoCommand {
1554                        opcode: BlockOpcode::Write.into_primitive(),
1555                        ..Default::default()
1556                    },
1557                    vmoid: vmo_id.id,
1558                    ..Default::default()
1559                };
1560
1561                assert_eq!(
1562                    test(&fifo, BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() })
1563                        .await,
1564                    Err(zx::Status::INVALID_ARGS)
1565                );
1566
1567                assert_eq!(
1568                    test(
1569                        &fifo,
1570                        BlockFifoRequest {
1571                            vmo_offset: 0xffff_ffff_ffff_ffff,
1572                            ..good_write_request()
1573                        }
1574                    )
1575                    .await,
1576                    Err(zx::Status::INVALID_ARGS)
1577                );
1578
1579                // CLOSE VMO
1580
1581                assert_eq!(
1582                    test(
1583                        &fifo,
1584                        BlockFifoRequest {
1585                            command: BlockFifoCommand {
1586                                opcode: BlockOpcode::CloseVmo.into_primitive(),
1587                                ..Default::default()
1588                            },
1589                            vmoid: vmo_id.id + 1,
1590                            ..Default::default()
1591                        }
1592                    )
1593                    .await,
1594                    Err(zx::Status::IO)
1595                );
1596
1597                std::mem::drop(proxy);
1598            }
1599        );
1600    }
1601
1602    #[fuchsia::test]
1603    async fn test_concurrent_requests() {
1604        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1605
1606        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1607        let waiting_readers_clone = waiting_readers.clone();
1608
1609        futures::join!(
1610            async move {
1611                let block_server = BlockServer::new(
1612                    BLOCK_SIZE,
1613                    Arc::new(MockInterface {
1614                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1615                            let (tx, rx) = oneshot::channel();
1616                            waiting_readers_clone
1617                                .lock()
1618                                .unwrap()
1619                                .push((dev_block_offset as u32, tx));
1620                            Box::pin(async move {
1621                                let _ = rx.await;
1622                                Ok(())
1623                            })
1624                        })),
1625                    }),
1626                );
1627                block_server.handle_requests(stream).await.unwrap();
1628            },
1629            async move {
1630                let (session_proxy, server) = fidl::endpoints::create_proxy();
1631
1632                proxy.open_session(server).unwrap();
1633
1634                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1635                let vmo_id = session_proxy
1636                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1637                    .await
1638                    .unwrap()
1639                    .unwrap();
1640
1641                let fifo =
1642                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1643
1644                fifo.write_entries(&BlockFifoRequest {
1645                    command: BlockFifoCommand {
1646                        opcode: BlockOpcode::Read.into_primitive(),
1647                        ..Default::default()
1648                    },
1649                    reqid: 1,
1650                    dev_offset: 1, // Intentionally use the same as `reqid`.
1651                    vmoid: vmo_id.id,
1652                    length: 1,
1653                    ..Default::default()
1654                })
1655                .await
1656                .unwrap();
1657
1658                fifo.write_entries(&BlockFifoRequest {
1659                    command: BlockFifoCommand {
1660                        opcode: BlockOpcode::Read.into_primitive(),
1661                        ..Default::default()
1662                    },
1663                    reqid: 2,
1664                    dev_offset: 2,
1665                    vmoid: vmo_id.id,
1666                    length: 1,
1667                    ..Default::default()
1668                })
1669                .await
1670                .unwrap();
1671
1672                // Wait till both those entries are pending.
1673                poll_fn(|cx: &mut Context<'_>| {
1674                    if waiting_readers.lock().unwrap().len() == 2 {
1675                        Poll::Ready(())
1676                    } else {
1677                        // Yield to the executor.
1678                        cx.waker().wake_by_ref();
1679                        Poll::Pending
1680                    }
1681                })
1682                .await;
1683
1684                assert!(futures::poll!(fifo.read_entry()).is_pending());
1685
1686                let (id, tx) = waiting_readers.lock().unwrap().pop().unwrap();
1687                tx.send(()).unwrap();
1688
1689                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1690                assert_eq!(response.status, zx::sys::ZX_OK);
1691                assert_eq!(response.reqid, id);
1692
1693                assert!(futures::poll!(fifo.read_entry()).is_pending());
1694
1695                let (id, tx) = waiting_readers.lock().unwrap().pop().unwrap();
1696                tx.send(()).unwrap();
1697
1698                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1699                assert_eq!(response.status, zx::sys::ZX_OK);
1700                assert_eq!(response.reqid, id);
1701            }
1702        );
1703    }
1704
1705    #[fuchsia::test]
1706    async fn test_groups() {
1707        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1708
1709        futures::join!(
1710            async move {
1711                let block_server = BlockServer::new(
1712                    BLOCK_SIZE,
1713                    Arc::new(MockInterface {
1714                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1715                    }),
1716                );
1717                block_server.handle_requests(stream).await.unwrap();
1718            },
1719            async move {
1720                let (session_proxy, server) = fidl::endpoints::create_proxy();
1721
1722                proxy.open_session(server).unwrap();
1723
1724                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1725                let vmo_id = session_proxy
1726                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1727                    .await
1728                    .unwrap()
1729                    .unwrap();
1730
1731                let fifo =
1732                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1733
1734                fifo.write_entries(&BlockFifoRequest {
1735                    command: BlockFifoCommand {
1736                        opcode: BlockOpcode::Read.into_primitive(),
1737                        flags: BlockIoFlag::GROUP_ITEM.bits(),
1738                        ..Default::default()
1739                    },
1740                    group: 1,
1741                    vmoid: vmo_id.id,
1742                    length: 1,
1743                    ..Default::default()
1744                })
1745                .await
1746                .unwrap();
1747
1748                fifo.write_entries(&BlockFifoRequest {
1749                    command: BlockFifoCommand {
1750                        opcode: BlockOpcode::Read.into_primitive(),
1751                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1752                        ..Default::default()
1753                    },
1754                    reqid: 2,
1755                    group: 1,
1756                    vmoid: vmo_id.id,
1757                    length: 1,
1758                    ..Default::default()
1759                })
1760                .await
1761                .unwrap();
1762
1763                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1764                assert_eq!(response.status, zx::sys::ZX_OK);
1765                assert_eq!(response.reqid, 2);
1766                assert_eq!(response.group, 1);
1767            }
1768        );
1769    }
1770
1771    #[fuchsia::test]
1772    async fn test_group_error() {
1773        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1774
1775        let counter = Arc::new(AtomicU64::new(0));
1776        let counter_clone = counter.clone();
1777
1778        futures::join!(
1779            async move {
1780                let block_server = BlockServer::new(
1781                    BLOCK_SIZE,
1782                    Arc::new(MockInterface {
1783                        read_hook: Some(Box::new(move |_, _, _, _| {
1784                            counter_clone.fetch_add(1, Ordering::Relaxed);
1785                            Box::pin(async { Err(zx::Status::BAD_STATE) })
1786                        })),
1787                    }),
1788                );
1789                block_server.handle_requests(stream).await.unwrap();
1790            },
1791            async move {
1792                let (session_proxy, server) = fidl::endpoints::create_proxy();
1793
1794                proxy.open_session(server).unwrap();
1795
1796                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1797                let vmo_id = session_proxy
1798                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1799                    .await
1800                    .unwrap()
1801                    .unwrap();
1802
1803                let fifo =
1804                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1805
1806                fifo.write_entries(&BlockFifoRequest {
1807                    command: BlockFifoCommand {
1808                        opcode: BlockOpcode::Read.into_primitive(),
1809                        flags: BlockIoFlag::GROUP_ITEM.bits(),
1810                        ..Default::default()
1811                    },
1812                    group: 1,
1813                    vmoid: vmo_id.id,
1814                    length: 1,
1815                    ..Default::default()
1816                })
1817                .await
1818                .unwrap();
1819
1820                // Wait until processed.
1821                poll_fn(|cx: &mut Context<'_>| {
1822                    if counter.load(Ordering::Relaxed) == 1 {
1823                        Poll::Ready(())
1824                    } else {
1825                        // Yield to the executor.
1826                        cx.waker().wake_by_ref();
1827                        Poll::Pending
1828                    }
1829                })
1830                .await;
1831
1832                assert!(futures::poll!(fifo.read_entry()).is_pending());
1833
1834                fifo.write_entries(&BlockFifoRequest {
1835                    command: BlockFifoCommand {
1836                        opcode: BlockOpcode::Read.into_primitive(),
1837                        flags: BlockIoFlag::GROUP_ITEM.bits(),
1838                        ..Default::default()
1839                    },
1840                    group: 1,
1841                    vmoid: vmo_id.id,
1842                    length: 1,
1843                    ..Default::default()
1844                })
1845                .await
1846                .unwrap();
1847
1848                fifo.write_entries(&BlockFifoRequest {
1849                    command: BlockFifoCommand {
1850                        opcode: BlockOpcode::Read.into_primitive(),
1851                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1852                        ..Default::default()
1853                    },
1854                    reqid: 2,
1855                    group: 1,
1856                    vmoid: vmo_id.id,
1857                    length: 1,
1858                    ..Default::default()
1859                })
1860                .await
1861                .unwrap();
1862
1863                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1864                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1865                assert_eq!(response.reqid, 2);
1866                assert_eq!(response.group, 1);
1867
1868                assert!(futures::poll!(fifo.read_entry()).is_pending());
1869
1870                // Only the first request should have been processed.
1871                assert_eq!(counter.load(Ordering::Relaxed), 1);
1872            }
1873        );
1874    }
1875
1876    #[fuchsia::test]
1877    async fn test_group_with_two_lasts() {
1878        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1879
1880        let (tx, rx) = oneshot::channel();
1881
1882        futures::join!(
1883            async move {
1884                let rx = Mutex::new(Some(rx));
1885                let block_server = BlockServer::new(
1886                    BLOCK_SIZE,
1887                    Arc::new(MockInterface {
1888                        read_hook: Some(Box::new(move |_, _, _, _| {
1889                            let rx = rx.lock().unwrap().take().unwrap();
1890                            Box::pin(async {
1891                                let _ = rx.await;
1892                                Ok(())
1893                            })
1894                        })),
1895                    }),
1896                );
1897                block_server.handle_requests(stream).await.unwrap();
1898            },
1899            async move {
1900                let (session_proxy, server) = fidl::endpoints::create_proxy();
1901
1902                proxy.open_session(server).unwrap();
1903
1904                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1905                let vmo_id = session_proxy
1906                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1907                    .await
1908                    .unwrap()
1909                    .unwrap();
1910
1911                let fifo =
1912                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1913
1914                fifo.write_entries(&BlockFifoRequest {
1915                    command: BlockFifoCommand {
1916                        opcode: BlockOpcode::Read.into_primitive(),
1917                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1918                        ..Default::default()
1919                    },
1920                    reqid: 1,
1921                    group: 1,
1922                    vmoid: vmo_id.id,
1923                    length: 1,
1924                    ..Default::default()
1925                })
1926                .await
1927                .unwrap();
1928
1929                fifo.write_entries(&BlockFifoRequest {
1930                    command: BlockFifoCommand {
1931                        opcode: BlockOpcode::Read.into_primitive(),
1932                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1933                        ..Default::default()
1934                    },
1935                    reqid: 2,
1936                    group: 1,
1937                    vmoid: vmo_id.id,
1938                    length: 1,
1939                    ..Default::default()
1940                })
1941                .await
1942                .unwrap();
1943
1944                // Send an independent request to flush through the fifo.
1945                fifo.write_entries(&BlockFifoRequest {
1946                    command: BlockFifoCommand {
1947                        opcode: BlockOpcode::CloseVmo.into_primitive(),
1948                        ..Default::default()
1949                    },
1950                    reqid: 3,
1951                    vmoid: vmo_id.id,
1952                    ..Default::default()
1953                })
1954                .await
1955                .unwrap();
1956
1957                // It should succeed.
1958                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1959                assert_eq!(response.status, zx::sys::ZX_OK);
1960                assert_eq!(response.reqid, 3);
1961
1962                // Now release the original request.
1963                tx.send(()).unwrap();
1964
1965                // The response should be for the first message tagged as last, and it should be
1966                // an error because we sent two messages with the LAST marker.
1967                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
1968                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
1969                assert_eq!(response.reqid, 1);
1970                assert_eq!(response.group, 1);
1971            }
1972        );
1973    }
1974
1975    #[fuchsia::test(allow_stalls = false)]
1976    async fn test_requests_dont_block_sessions() {
1977        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1978
1979        let (tx, rx) = oneshot::channel();
1980
1981        fasync::Task::local(async move {
1982            let rx = Mutex::new(Some(rx));
1983            let block_server = BlockServer::new(
1984                BLOCK_SIZE,
1985                Arc::new(MockInterface {
1986                    read_hook: Some(Box::new(move |_, _, _, _| {
1987                        let rx = rx.lock().unwrap().take().unwrap();
1988                        Box::pin(async {
1989                            let _ = rx.await;
1990                            Ok(())
1991                        })
1992                    })),
1993                }),
1994            );
1995            block_server.handle_requests(stream).await.unwrap();
1996        })
1997        .detach();
1998
1999        let mut fut = pin!(async {
2000            let (session_proxy, server) = fidl::endpoints::create_proxy();
2001
2002            proxy.open_session(server).unwrap();
2003
2004            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2005            let vmo_id = session_proxy
2006                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2007                .await
2008                .unwrap()
2009                .unwrap();
2010
2011            let fifo = fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2012
2013            fifo.write_entries(&BlockFifoRequest {
2014                command: BlockFifoCommand {
2015                    opcode: BlockOpcode::Read.into_primitive(),
2016                    flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2017                    ..Default::default()
2018                },
2019                reqid: 1,
2020                group: 1,
2021                vmoid: vmo_id.id,
2022                length: 1,
2023                ..Default::default()
2024            })
2025            .await
2026            .unwrap();
2027
2028            let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2029            assert_eq!(response.status, zx::sys::ZX_OK);
2030        });
2031
2032        // The response won't come back until we send on `tx`.
2033        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2034
2035        let mut fut2 = pin!(proxy.get_volume_info());
2036
2037        // get_volume_info is set up to stall forever.
2038        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2039
2040        // If we now free up the first future, it should resolve; the stalled call to
2041        // get_volume_info should not block the fifo response.
2042        let _ = tx.send(());
2043
2044        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2045    }
2046
2047    #[fuchsia::test]
2048    async fn test_request_flow_control() {
2049        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2050
2051        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2052        // server will block until that happens.
2053        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2054        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2055        let event_clone = event.clone();
2056        futures::join!(
2057            async move {
2058                let block_server = BlockServer::new(
2059                    BLOCK_SIZE,
2060                    Arc::new(MockInterface {
2061                        read_hook: Some(Box::new(move |_, _, _, _| {
2062                            let event_clone = event_clone.clone();
2063                            Box::pin(async move {
2064                                if !event_clone.1.load(Ordering::SeqCst) {
2065                                    event_clone.0.listen().await;
2066                                }
2067                                Ok(())
2068                            })
2069                        })),
2070                    }),
2071                );
2072                block_server.handle_requests(stream).await.unwrap();
2073            },
2074            async move {
2075                let (session_proxy, server) = fidl::endpoints::create_proxy();
2076
2077                proxy.open_session(server).unwrap();
2078
2079                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2080                let vmo_id = session_proxy
2081                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2082                    .await
2083                    .unwrap()
2084                    .unwrap();
2085
2086                let fifo =
2087                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2088
2089                for i in 0..MAX_REQUESTS {
2090                    fifo.write_entries(&BlockFifoRequest {
2091                        command: BlockFifoCommand {
2092                            opcode: BlockOpcode::Read.into_primitive(),
2093                            ..Default::default()
2094                        },
2095                        reqid: (i + 1) as u32,
2096                        dev_offset: i,
2097                        vmoid: vmo_id.id,
2098                        length: 1,
2099                        ..Default::default()
2100                    })
2101                    .await
2102                    .unwrap();
2103                }
2104                assert!(futures::poll!(fifo.write_entries(&BlockFifoRequest {
2105                    command: BlockFifoCommand {
2106                        opcode: BlockOpcode::Read.into_primitive(),
2107                        ..Default::default()
2108                    },
2109                    reqid: u32::MAX,
2110                    dev_offset: MAX_REQUESTS,
2111                    vmoid: vmo_id.id,
2112                    length: 1,
2113                    ..Default::default()
2114                }))
2115                .is_pending());
2116                // OK, let the server start to process.
2117                event.1.store(true, Ordering::SeqCst);
2118                event.0.notify(usize::MAX);
2119                // For each entry we read, make sure we can write a new one in.
2120                let mut finished_reqids = vec![];
2121                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2122                    let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2123                    assert_eq!(response.status, zx::sys::ZX_OK);
2124                    finished_reqids.push(response.reqid);
2125                    fifo.write_entries(&BlockFifoRequest {
2126                        command: BlockFifoCommand {
2127                            opcode: BlockOpcode::Read.into_primitive(),
2128                            ..Default::default()
2129                        },
2130                        reqid: (i + 1) as u32,
2131                        dev_offset: i,
2132                        vmoid: vmo_id.id,
2133                        length: 1,
2134                        ..Default::default()
2135                    })
2136                    .await
2137                    .unwrap();
2138                }
2139                for _ in 0..MAX_REQUESTS {
2140                    let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2141                    assert_eq!(response.status, zx::sys::ZX_OK);
2142                    finished_reqids.push(response.reqid);
2143                }
2144                // Verify that we got a response for each request.  Note that we can't assume FIFO
2145                // ordering.
2146                finished_reqids.sort();
2147                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2148                let mut i = 1;
2149                for reqid in finished_reqids {
2150                    assert_eq!(reqid, i);
2151                    i += 1;
2152                }
2153            }
2154        );
2155    }
2156
2157    #[fuchsia::test]
2158    async fn test_passthrough_io_with_fixed_map() {
2159        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2160
2161        let expected_op = Arc::new(Mutex::new(None));
2162        let expected_op_clone = expected_op.clone();
2163        futures::join!(
2164            async {
2165                let block_server = BlockServer::new(
2166                    BLOCK_SIZE,
2167                    Arc::new(IoMockInterface {
2168                        return_errors: false,
2169                        do_checks: true,
2170                        expected_op: expected_op_clone,
2171                    }),
2172                );
2173                block_server.handle_requests(stream).await.unwrap();
2174            },
2175            async move {
2176                let (session_proxy, server) = fidl::endpoints::create_proxy();
2177
2178                let mappings = [fblock::BlockOffsetMapping {
2179                    source_block_offset: 0,
2180                    target_block_offset: 10,
2181                    length: 20,
2182                }];
2183                proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2184
2185                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2186                let vmo_id = session_proxy
2187                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2188                    .await
2189                    .unwrap()
2190                    .unwrap();
2191
2192                let fifo =
2193                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2194
2195                // READ
2196                *expected_op.lock().unwrap() = Some(ExpectedOp::Read(11, 2, 3));
2197                fifo.write_entries(&BlockFifoRequest {
2198                    command: BlockFifoCommand {
2199                        opcode: BlockOpcode::Read.into_primitive(),
2200                        ..Default::default()
2201                    },
2202                    vmoid: vmo_id.id,
2203                    dev_offset: 1,
2204                    length: 2,
2205                    vmo_offset: 3,
2206                    ..Default::default()
2207                })
2208                .await
2209                .unwrap();
2210
2211                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2212                assert_eq!(response.status, zx::sys::ZX_OK);
2213
2214                // WRITE
2215                *expected_op.lock().unwrap() = Some(ExpectedOp::Write(14, 5, 6));
2216                fifo.write_entries(&BlockFifoRequest {
2217                    command: BlockFifoCommand {
2218                        opcode: BlockOpcode::Write.into_primitive(),
2219                        ..Default::default()
2220                    },
2221                    vmoid: vmo_id.id,
2222                    dev_offset: 4,
2223                    length: 5,
2224                    vmo_offset: 6,
2225                    ..Default::default()
2226                })
2227                .await
2228                .unwrap();
2229
2230                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2231                assert_eq!(response.status, zx::sys::ZX_OK);
2232
2233                // FLUSH
2234                *expected_op.lock().unwrap() = Some(ExpectedOp::Flush);
2235                fifo.write_entries(&BlockFifoRequest {
2236                    command: BlockFifoCommand {
2237                        opcode: BlockOpcode::Flush.into_primitive(),
2238                        ..Default::default()
2239                    },
2240                    ..Default::default()
2241                })
2242                .await
2243                .unwrap();
2244
2245                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2246                assert_eq!(response.status, zx::sys::ZX_OK);
2247
2248                // TRIM
2249                *expected_op.lock().unwrap() = Some(ExpectedOp::Trim(17, 3));
2250                fifo.write_entries(&BlockFifoRequest {
2251                    command: BlockFifoCommand {
2252                        opcode: BlockOpcode::Trim.into_primitive(),
2253                        ..Default::default()
2254                    },
2255                    dev_offset: 7,
2256                    length: 3,
2257                    ..Default::default()
2258                })
2259                .await
2260                .unwrap();
2261
2262                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2263                assert_eq!(response.status, zx::sys::ZX_OK);
2264
2265                // READ past window
2266                *expected_op.lock().unwrap() = None;
2267                fifo.write_entries(&BlockFifoRequest {
2268                    command: BlockFifoCommand {
2269                        opcode: BlockOpcode::Read.into_primitive(),
2270                        ..Default::default()
2271                    },
2272                    vmoid: vmo_id.id,
2273                    dev_offset: 19,
2274                    length: 2,
2275                    vmo_offset: 3,
2276                    ..Default::default()
2277                })
2278                .await
2279                .unwrap();
2280
2281                let response: BlockFifoResponse = fifo.read_entry().await.unwrap();
2282                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2283
2284                std::mem::drop(proxy);
2285            }
2286        );
2287    }
2288}