_block_server_c_rustc_static/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
7use futures::{Future, FutureExt as _, TryStreamExt as _};
8use std::borrow::Cow;
9use std::collections::btree_map::Entry;
10use std::collections::BTreeMap;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::AtomicU64;
14use std::sync::{Arc, Mutex};
15use zx::HandleBased;
16use {
17    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
18    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
19};
20
21pub mod async_interface;
22pub mod c_interface;
23
24#[derive(Clone)]
25pub enum DeviceInfo {
26    Block(BlockInfo),
27    Partition(PartitionInfo),
28}
29
30/// Information associated with non-partition block devices.
31#[derive(Clone)]
32pub struct BlockInfo {
33    pub device_flags: fblock::Flag,
34    pub block_count: u64,
35}
36
37/// Information associated with a block device that is also a partition.
38#[derive(Clone)]
39pub struct PartitionInfo {
40    /// The device flags reported by the underlying device.
41    pub device_flags: fblock::Flag,
42    /// If `block_range` is None, the partition is a volume and may not be contiguous.
43    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
44    /// slices and use that (along with the slice and block sizes) to determine the block count.
45    pub block_range: Option<Range<u64>>,
46    pub type_guid: [u8; 16],
47    pub instance_guid: [u8; 16],
48    pub name: String,
49    pub flags: u64,
50}
51
52// Multiple Block I/O request may be sent as a group.
53// Notes:
54// - the group is identified by the group id in the request
55// - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
56//   flag is set.
57// - when processing a request of a group fails, subsequent requests of that
58//   group will not be processed.
59//
60// Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
61//
62// FifoMessageGroup keeps track of the relevant BlockFifoResponse field for
63// a group requests. Only `status` and `count` needs to be updated.
64struct FifoMessageGroup {
65    status: zx::Status,
66    count: u32,
67    req_id: Option<u32>,
68}
69
70impl FifoMessageGroup {
71    fn new() -> Self {
72        FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
73    }
74}
75
76#[derive(Default)]
77struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
78
79// Keeps track of all the group requests that are currently being processed
80impl FifoMessageGroups {
81    /// Completes a request and returns a response to be sent if it's the last outstanding request
82    /// for this group.
83    fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
84        let mut map = self.0.lock().unwrap();
85        let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
86        let group = o.get_mut();
87        if group.count == 1 {
88            if let Some(reqid) = group.req_id {
89                let status =
90                    if group.status != zx::Status::OK { group.status } else { status }.into_raw();
91
92                o.remove();
93
94                return Some(BlockFifoResponse {
95                    status,
96                    reqid,
97                    group: group_id,
98                    ..Default::default()
99                });
100            }
101        }
102
103        group.count = group.count.checked_sub(1).unwrap();
104        if status != zx::Status::OK && group.status == zx::Status::OK {
105            group.status = status
106        }
107        None
108    }
109}
110
111/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
112/// cbindgen:no-export
113pub struct BlockServer<SM> {
114    block_size: u32,
115    session_manager: Arc<SM>,
116}
117
118/// A single entry in `[OffsetMap]`.
119pub struct BlockOffsetMapping {
120    source_block_offset: u64,
121    target_block_offset: u64,
122    length: u64,
123}
124
125impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
126    type Error = zx::Status;
127
128    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
129        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
130        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
131        Ok(Self {
132            source_block_offset: wire.source_block_offset,
133            target_block_offset: wire.target_block_offset,
134            length: wire.length,
135        })
136    }
137}
138
139/// Remaps the offset of block requests based on an internal map.
140/// TODO(https://fxbug.dev/402515764): For now, this just supports a single entry in the map, which
141/// is all that is required for GPT partitions.  If we want to support this for FVM, we will need
142/// to support multiple entries, which requires changing the block server to support request
143/// splitting.
144pub struct OffsetMap {
145    mapping: BlockOffsetMapping,
146}
147
148impl OffsetMap {
149    pub fn new(mapping: BlockOffsetMapping) -> Self {
150        Self { mapping }
151    }
152
153    /// Adjusts the requested range, returning the new dev_offset.
154    /// Returns false if the request would exceed the range known to OffsetMap.
155    pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
156        if length == 0 {
157            return None;
158        }
159        let end = dev_offset.checked_add(length)?;
160        if self.mapping.source_block_offset > dev_offset
161            || end > self.mapping.source_block_offset + self.mapping.length
162        {
163            return None;
164        }
165        let delta = dev_offset - self.mapping.source_block_offset;
166        Some(self.mapping.target_block_offset + delta)
167    }
168}
169
170// Methods take Arc<Self> rather than &self because of
171// https://github.com/rust-lang/rust/issues/42940.
172pub trait SessionManager: 'static {
173    fn on_attach_vmo(
174        self: Arc<Self>,
175        vmo: &Arc<zx::Vmo>,
176    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
177
178    /// Creates a new session to handle `stream`.
179    /// The returned future should run until the session completes, for example when the client end
180    /// closes.
181    /// If `offset_map` is set, it will be used to remap the dev_offset of FIFO requests.
182    fn open_session(
183        self: Arc<Self>,
184        stream: fblock::SessionRequestStream,
185        offset_map: Option<OffsetMap>,
186        block_size: u32,
187    ) -> impl Future<Output = Result<(), Error>> + Send;
188
189    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
190    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
191
192    /// Called to handle the GetVolumeInfo FIDL call.
193    fn get_volume_info(
194        &self,
195    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
196    {
197        async { Err(zx::Status::NOT_SUPPORTED) }
198    }
199
200    /// Called to handle the QuerySlices FIDL call.
201    fn query_slices(
202        &self,
203        _start_slices: &[u64],
204    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
205        async { Err(zx::Status::NOT_SUPPORTED) }
206    }
207
208    /// Called to handle the Shrink FIDL call.
209    fn extend(
210        &self,
211        _start_slice: u64,
212        _slice_count: u64,
213    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
214        async { Err(zx::Status::NOT_SUPPORTED) }
215    }
216
217    /// Called to handle the Shrink FIDL call.
218    fn shrink(
219        &self,
220        _start_slice: u64,
221        _slice_count: u64,
222    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
223        async { Err(zx::Status::NOT_SUPPORTED) }
224    }
225}
226
227pub trait IntoSessionManager {
228    type SM: SessionManager;
229
230    fn into_session_manager(self) -> Arc<Self::SM>;
231}
232
233impl<SM: SessionManager> BlockServer<SM> {
234    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
235        Self { block_size, session_manager: session_manager.into_session_manager() }
236    }
237
238    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
239    pub async fn handle_requests(
240        &self,
241        mut requests: fvolume::VolumeRequestStream,
242    ) -> Result<(), Error> {
243        let scope = fasync::Scope::new();
244        while let Some(request) = requests.try_next().await.unwrap() {
245            if let Some(session) = self.handle_request(request).await? {
246                scope.spawn(session.map(|_| ()));
247            }
248        }
249        scope.await;
250        Ok(())
251    }
252
253    /// Processes a partition request.  If a new session task is created in response to the request,
254    /// it is returned.
255    async fn handle_request(
256        &self,
257        request: fvolume::VolumeRequest,
258    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
259        match request {
260            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
261                Ok(info) => {
262                    let (block_count, flags) = match info.as_ref() {
263                        DeviceInfo::Block(BlockInfo { block_count, device_flags }) => {
264                            (*block_count, *device_flags)
265                        }
266                        DeviceInfo::Partition(partition_info) => {
267                            let block_count = if let Some(range) =
268                                partition_info.block_range.as_ref()
269                            {
270                                range.end - range.start
271                            } else {
272                                let volume_info = self.session_manager.get_volume_info().await?;
273                                volume_info.0.slice_size * volume_info.1.partition_slice_count
274                                    / self.block_size as u64
275                            };
276                            (block_count, partition_info.device_flags)
277                        }
278                    };
279                    responder.send(Ok(&fblock::BlockInfo {
280                        block_count,
281                        block_size: self.block_size,
282                        max_transfer_size: fblock::MAX_TRANSFER_UNBOUNDED,
283                        flags,
284                    }))?;
285                }
286                Err(status) => responder.send(Err(status.into_raw()))?,
287            },
288            fvolume::VolumeRequest::GetStats { clear: _, responder } => {
289                // TODO(https://fxbug.dev/348077960): Implement this
290                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
291            }
292            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
293                return Ok(Some(self.session_manager.clone().open_session(
294                    session.into_stream(),
295                    None,
296                    self.block_size,
297                )));
298            }
299            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
300                session,
301                offset_map,
302                initial_mappings,
303                control_handle: _,
304            } => {
305                if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
306                    // TODO(https://fxbug.dev/402515764): Support multiple mappings and
307                    // dynamic querying for FVM as needed.  A single static map is
308                    // sufficient for GPT.
309                    session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
310                    return Ok(None);
311                }
312                let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
313                    Ok(m) => m,
314                    Err(status) => {
315                        session.close_with_epitaph(status)?;
316                        return Ok(None);
317                    }
318                };
319                let offset_map = OffsetMap::new(initial_mapping);
320                return Ok(Some(self.session_manager.clone().open_session(
321                    session.into_stream(),
322                    Some(offset_map),
323                    self.block_size,
324                )));
325            }
326            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
327                Ok(info) => {
328                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
329                        let mut guid =
330                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
331                        guid.value.copy_from_slice(&partition_info.type_guid);
332                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
333                    } else {
334                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
335                    }
336                }
337                Err(status) => {
338                    responder.send(status.into_raw(), None)?;
339                }
340            },
341            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
342                match self.device_info().await {
343                    Ok(info) => {
344                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
345                            let mut guid =
346                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
347                            guid.value.copy_from_slice(&partition_info.instance_guid);
348                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
349                        } else {
350                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
351                        }
352                    }
353                    Err(status) => {
354                        responder.send(status.into_raw(), None)?;
355                    }
356                }
357            }
358            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
359                Ok(info) => {
360                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
361                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
362                    } else {
363                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
364                    }
365                }
366                Err(status) => {
367                    responder.send(status.into_raw(), None)?;
368                }
369            },
370            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
371                Ok(info) => {
372                    if let DeviceInfo::Partition(info) = info.as_ref() {
373                        let mut type_guid =
374                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
375                        type_guid.value.copy_from_slice(&info.type_guid);
376                        let mut instance_guid =
377                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
378                        instance_guid.value.copy_from_slice(&info.instance_guid);
379                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
380                            name: Some(info.name.clone()),
381                            type_guid: Some(type_guid),
382                            instance_guid: Some(instance_guid),
383                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
384                            num_blocks: info
385                                .block_range
386                                .as_ref()
387                                .map(|range| range.end - range.start),
388                            flags: Some(info.flags),
389                            ..Default::default()
390                        }))?;
391                    }
392                }
393                Err(status) => responder.send(Err(status.into_raw()))?,
394            },
395            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
396                match self.session_manager.query_slices(&start_slices).await {
397                    Ok(mut results) => {
398                        let results_len = results.len();
399                        assert!(results_len <= 16);
400                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
401                        responder.send(
402                            zx::sys::ZX_OK,
403                            &results.try_into().unwrap(),
404                            results_len as u64,
405                        )?;
406                    }
407                    Err(s) => {
408                        responder.send(
409                            s.into_raw(),
410                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
411                            0,
412                        )?;
413                    }
414                }
415            }
416            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
417                match self.session_manager.get_volume_info().await {
418                    Ok((manager_info, volume_info)) => {
419                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
420                    }
421                    Err(s) => responder.send(s.into_raw(), None, None)?,
422                }
423            }
424            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
425                responder.send(
426                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
427                        .into_raw(),
428                )?;
429            }
430            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
431                responder.send(
432                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
433                        .into_raw(),
434                )?;
435            }
436            fvolume::VolumeRequest::Destroy { responder, .. } => {
437                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
438            }
439        }
440        Ok(None)
441    }
442
443    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
444        self.session_manager.get_info().await
445    }
446}
447
448struct SessionHelper<SM: SessionManager> {
449    session_manager: Arc<SM>,
450    offset_map: Option<OffsetMap>,
451    block_size: u32,
452    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
453    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
454    message_groups: FifoMessageGroups,
455}
456
457impl<SM: SessionManager> SessionHelper<SM> {
458    fn new(
459        session_manager: Arc<SM>,
460        offset_map: Option<OffsetMap>,
461        block_size: u32,
462    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
463        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
464        Ok((
465            Self {
466                session_manager,
467                offset_map,
468                block_size,
469                peer_fifo,
470                vmos: Mutex::default(),
471                message_groups: FifoMessageGroups::default(),
472            },
473            fifo,
474        ))
475    }
476
477    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
478        match request {
479            fblock::SessionRequest::GetFifo { responder } => {
480                let rights = zx::Rights::TRANSFER
481                    | zx::Rights::READ
482                    | zx::Rights::WRITE
483                    | zx::Rights::SIGNAL
484                    | zx::Rights::WAIT;
485                match self.peer_fifo.duplicate_handle(rights) {
486                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
487                    Err(s) => responder.send(Err(s.into_raw()))?,
488                }
489                Ok(())
490            }
491            fblock::SessionRequest::AttachVmo { vmo, responder } => {
492                let vmo = Arc::new(vmo);
493                let vmo_id = {
494                    let mut vmos = self.vmos.lock().unwrap();
495                    if vmos.len() == u16::MAX as usize {
496                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
497                        return Ok(());
498                    } else {
499                        let vmo_id = match vmos.last_entry() {
500                            None => 1,
501                            Some(o) => {
502                                o.key().checked_add(1).unwrap_or_else(|| {
503                                    let mut vmo_id = 1;
504                                    // Find the first gap...
505                                    for (&id, _) in &*vmos {
506                                        if id > vmo_id {
507                                            break;
508                                        }
509                                        vmo_id = id + 1;
510                                    }
511                                    vmo_id
512                                })
513                            }
514                        };
515                        vmos.insert(vmo_id, vmo.clone());
516                        vmo_id
517                    }
518                };
519                self.session_manager.clone().on_attach_vmo(&vmo).await?;
520                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
521                Ok(())
522            }
523            fblock::SessionRequest::Close { responder } => {
524                responder.send(Ok(()))?;
525                Err(anyhow!("Closed"))
526            }
527        }
528    }
529
530    fn finish_fifo_request(
531        &self,
532        request: RequestTracking,
533        status: zx::Status,
534    ) -> Option<BlockFifoResponse> {
535        match request.group_or_request {
536            GroupOrRequest::Group(group_id) => {
537                let response = self.message_groups.complete(group_id, status);
538                fuchsia_trace::duration!(
539                    c"storage",
540                    c"block_server::finish_transaction_in_group",
541                    "group" => u32::from(group_id),
542                    "group_completed" => response.is_some(),
543                    "status" => status.into_raw());
544                if let Some(trace_flow_id) = request.trace_flow_id {
545                    fuchsia_trace::flow_step!(
546                        c"storage",
547                        c"block_server::finish_transaction",
548                        trace_flow_id.get().into()
549                    );
550                }
551                response
552            }
553            GroupOrRequest::Request(reqid) => {
554                fuchsia_trace::duration!(
555                    c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
556                if let Some(trace_flow_id) = request.trace_flow_id {
557                    fuchsia_trace::flow_step!(
558                        c"storage",
559                        c"block_server::finish_transaction",
560                        trace_flow_id.get().into()
561                    );
562                }
563                Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
564            }
565        }
566    }
567
568    fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
569        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
570        let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
571        let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
572        let mut op_code =
573            BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
574
575        let group_or_request = if is_group {
576            let mut groups = self.message_groups.0.lock().unwrap();
577            let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
578            if group.req_id.is_some() {
579                // We have already received a request tagged as last.
580                if group.status == zx::Status::OK {
581                    group.status = zx::Status::INVALID_ARGS;
582                }
583                return None;
584            }
585            if last_in_group {
586                group.req_id = Some(request.reqid);
587                // If the group has had an error, there is no point trying to issue this request.
588                if group.status != zx::Status::OK {
589                    op_code = Err(group.status);
590                }
591            } else if group.status != zx::Status::OK {
592                // The group has already encountered an error, so there is no point trying to issue
593                // this request.
594                return None;
595            }
596            group.count += 1;
597            GroupOrRequest::Group(request.group)
598        } else {
599            GroupOrRequest::Request(request.reqid)
600        };
601
602        let mut vmo_offset = 0;
603        let vmo = match op_code {
604            Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
605                if request.length == 0 {
606                    return Err(zx::Status::INVALID_ARGS);
607                }
608                vmo_offset = request
609                    .vmo_offset
610                    .checked_mul(self.block_size as u64)
611                    .ok_or(zx::Status::OUT_OF_RANGE)?;
612                self.vmos
613                    .lock()
614                    .unwrap()
615                    .get(&request.vmoid)
616                    .cloned()
617                    .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
618            })(),
619            Ok(BlockOpcode::CloseVmo) => self
620                .vmos
621                .lock()
622                .unwrap()
623                .remove(&request.vmoid)
624                .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
625            _ => Ok(None),
626        }
627        .unwrap_or_else(|e| {
628            op_code = Err(e);
629            None
630        });
631
632        let operation = op_code.map(|code| match code {
633            BlockOpcode::Read => Operation::Read {
634                device_block_offset: request.dev_offset,
635                block_count: request.length,
636                _unused: 0,
637                vmo_offset,
638            },
639            BlockOpcode::Write => Operation::Write {
640                device_block_offset: request.dev_offset,
641                block_count: request.length,
642                options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
643                    WriteOptions::FORCE_ACCESS
644                } else {
645                    WriteOptions::empty()
646                },
647                vmo_offset,
648            },
649            BlockOpcode::Flush => Operation::Flush,
650            BlockOpcode::Trim => Operation::Trim {
651                device_block_offset: request.dev_offset,
652                block_count: request.length,
653            },
654            BlockOpcode::CloseVmo => Operation::CloseVmo,
655        });
656        if let Ok(operation) = operation.as_ref() {
657            use fuchsia_trace::ArgValue;
658            static CACHE: AtomicU64 = AtomicU64::new(0);
659            if let Some(context) =
660                fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
661            {
662                let trace_args_with_group = [
663                    ArgValue::of("group", u32::from(group_or_request.group_id())),
664                    ArgValue::of("opcode", operation.trace_label()),
665                ];
666                let trace_args = [ArgValue::of("opcode", operation.trace_label())];
667                let _scope = if group_or_request.is_group() {
668                    fuchsia_trace::duration(
669                        c"storage",
670                        c"block_server::start_transaction",
671                        &trace_args_with_group,
672                    )
673                } else {
674                    fuchsia_trace::duration(
675                        c"storage",
676                        c"block_server::start_transaction",
677                        &trace_args,
678                    )
679                };
680                let trace_flow_id = NonZero::new(request.trace_flow_id);
681                if let Some(trace_flow_id) = trace_flow_id.clone() {
682                    fuchsia_trace::flow_step(
683                        &context,
684                        c"block_server::start_trnsaction",
685                        trace_flow_id.get().into(),
686                        &[],
687                    );
688                }
689            }
690        }
691        Some(DecodedRequest::new(
692            RequestTracking {
693                group_or_request,
694                trace_flow_id: NonZero::new(request.trace_flow_id),
695            },
696            operation,
697            vmo,
698            self.offset_map.as_ref(),
699        ))
700    }
701
702    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
703        std::mem::take(&mut *self.vmos.lock().unwrap())
704    }
705}
706
707#[derive(Debug)]
708struct DecodedRequest {
709    request_tracking: RequestTracking,
710    operation: Result<Operation, zx::Status>,
711    vmo: Option<Arc<zx::Vmo>>,
712}
713
714impl DecodedRequest {
715    fn new(
716        request_tracking: RequestTracking,
717        operation: Result<Operation, zx::Status>,
718        vmo: Option<Arc<zx::Vmo>>,
719        offset_map: Option<&OffsetMap>,
720    ) -> Self {
721        let operation =
722            operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
723        Self { request_tracking, operation, vmo }
724    }
725}
726
727/// cbindgen:no-export
728pub type WriteOptions = block_protocol::WriteOptions;
729
730#[repr(C)]
731#[derive(Debug)]
732pub enum Operation {
733    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
734    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
735    // code can read `device_block_offset` from the read variant and then assume it's valid for the
736    // write variant).
737    Read {
738        device_block_offset: u64,
739        block_count: u32,
740        _unused: u32,
741        vmo_offset: u64,
742    },
743    Write {
744        device_block_offset: u64,
745        block_count: u32,
746        options: WriteOptions,
747        vmo_offset: u64,
748    },
749    Flush,
750    Trim {
751        device_block_offset: u64,
752        block_count: u32,
753    },
754    /// This will never be seen by the C interface.
755    CloseVmo,
756}
757
758impl Operation {
759    // Adjusts the operation's block range via `OffsetMap`.  If the request would exceed the range
760    // known to the map, or is otherwise invalid, returns an error.
761    fn validate_and_adjust_range(
762        mut self,
763        offset_map: Option<&OffsetMap>,
764    ) -> Result<Operation, zx::Status> {
765        let adjust_offset = |dev_offset, length| {
766            // For compatibility with the C++ server, always ensure the length is nonzero
767            if length == 0 {
768                return Err(zx::Status::INVALID_ARGS);
769            }
770            if let Some(offset_map) = offset_map {
771                offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
772            } else {
773                Ok(dev_offset)
774            }
775        };
776        match &mut self {
777            Operation::Read { device_block_offset, block_count, .. } => {
778                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
779            }
780            Operation::Write { device_block_offset, block_count, .. } => {
781                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
782            }
783            Operation::Trim { device_block_offset, block_count, .. } => {
784                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
785            }
786            _ => {}
787        }
788        Ok(self)
789    }
790    fn trace_label(&self) -> &'static str {
791        match self {
792            Operation::Read { .. } => "read",
793            Operation::Write { .. } => "write",
794            Operation::Flush { .. } => "flush",
795            Operation::Trim { .. } => "trim",
796            Operation::CloseVmo { .. } => "close_vmo",
797        }
798    }
799}
800
801#[derive(Clone, Copy, Debug)]
802pub struct RequestTracking {
803    group_or_request: GroupOrRequest,
804    trace_flow_id: Option<NonZero<u64>>,
805}
806
807#[derive(Clone, Copy, Debug)]
808pub enum GroupOrRequest {
809    Group(u16),
810    Request(u32),
811}
812
813impl GroupOrRequest {
814    fn is_group(&self) -> bool {
815        if let Self::Group(_) = self {
816            true
817        } else {
818            false
819        }
820    }
821
822    fn group_id(&self) -> u16 {
823        match self {
824            Self::Group(id) => *id,
825            Self::Request(_) => 0,
826        }
827    }
828}
829
830/// cbindgen:ignore
831const IS_GROUP: u64 = 0x8000_0000_0000_0000;
832/// cbindgen:ignore
833const USED_VMO: u64 = 0x4000_0000_0000_0000;
834#[repr(transparent)]
835#[derive(Clone, Copy, Eq, PartialEq, Hash)]
836pub struct RequestId(u64);
837
838impl RequestId {
839    /// Marks the request as having used a VMO, so that we keep the VMO alive until
840    /// the request has finished.
841    fn with_vmo(self) -> Self {
842        Self(self.0 | USED_VMO)
843    }
844
845    /// Returns whether the request ID indicates a VMO was used.
846    fn did_have_vmo(&self) -> bool {
847        self.0 & USED_VMO != 0
848    }
849}
850
851impl From<GroupOrRequest> for RequestId {
852    fn from(value: GroupOrRequest) -> Self {
853        match value {
854            GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
855            GroupOrRequest::Request(request) => RequestId(request as u64),
856        }
857    }
858}
859
860impl From<RequestId> for GroupOrRequest {
861    fn from(value: RequestId) -> Self {
862        if value.0 & IS_GROUP == 0 {
863            GroupOrRequest::Request(value.0 as u32)
864        } else {
865            GroupOrRequest::Group(value.0 as u16)
866        }
867    }
868}
869
870#[cfg(test)]
871mod tests {
872    use super::{BlockServer, DeviceInfo, PartitionInfo};
873    use crate::async_interface::FIFO_MAX_REQUESTS;
874    use assert_matches::assert_matches;
875    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
876    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
877    use futures::channel::oneshot;
878    use futures::future::BoxFuture;
879    use futures::FutureExt as _;
880    use std::borrow::Cow;
881    use std::future::poll_fn;
882    use std::num::NonZero;
883    use std::pin::pin;
884    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
885    use std::sync::{Arc, Mutex};
886    use std::task::{Context, Poll};
887    use zx::{AsHandleRef as _, HandleBased as _};
888    use {
889        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
890        fuchsia_async as fasync,
891    };
892
893    #[derive(Default)]
894    struct MockInterface {
895        read_hook: Option<
896            Box<
897                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
898                    + Send
899                    + Sync,
900            >,
901        >,
902    }
903
904    impl super::async_interface::Interface for MockInterface {
905        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
906            Ok(())
907        }
908
909        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
910            Ok(Cow::Owned(test_device_info()))
911        }
912
913        async fn read(
914            &self,
915            device_block_offset: u64,
916            block_count: u32,
917            vmo: &Arc<zx::Vmo>,
918            vmo_offset: u64,
919            _trace_flow_id: Option<NonZero<u64>>,
920        ) -> Result<(), zx::Status> {
921            if let Some(read_hook) = &self.read_hook {
922                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
923            } else {
924                unimplemented!();
925            }
926        }
927
928        async fn write(
929            &self,
930            _device_block_offset: u64,
931            _block_count: u32,
932            _vmo: &Arc<zx::Vmo>,
933            _vmo_offset: u64,
934            _opts: WriteOptions,
935            _trace_flow_id: Option<NonZero<u64>>,
936        ) -> Result<(), zx::Status> {
937            unreachable!();
938        }
939
940        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
941            unreachable!();
942        }
943
944        async fn trim(
945            &self,
946            _device_block_offset: u64,
947            _block_count: u32,
948            _trace_flow_id: Option<NonZero<u64>>,
949        ) -> Result<(), zx::Status> {
950            unreachable!();
951        }
952
953        async fn get_volume_info(
954            &self,
955        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
956            // Hang forever for the test_requests_dont_block_sessions test.
957            let () = std::future::pending().await;
958            unreachable!();
959        }
960    }
961
962    const BLOCK_SIZE: u32 = 512;
963
964    fn test_device_info() -> DeviceInfo {
965        DeviceInfo::Partition(PartitionInfo {
966            device_flags: fblock::Flag::READONLY,
967            block_range: Some(12..34),
968            type_guid: [1; 16],
969            instance_guid: [2; 16],
970            name: "foo".to_string(),
971            flags: 0xabcd,
972        })
973    }
974
975    #[fuchsia::test]
976    async fn test_info() {
977        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
978
979        futures::join!(
980            async {
981                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
982                block_server.handle_requests(stream).await.unwrap();
983            },
984            async {
985                let expected_info = test_device_info();
986                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
987                    info
988                } else {
989                    unreachable!()
990                };
991
992                let block_info = proxy.get_info().await.unwrap().unwrap();
993                assert_eq!(
994                    block_info.block_count,
995                    partition_info.block_range.as_ref().unwrap().end
996                        - partition_info.block_range.as_ref().unwrap().start
997                );
998                assert_eq!(block_info.flags, fblock::Flag::READONLY);
999
1000                // TODO(https://fxbug.dev/348077960): Check max_transfer_size
1001
1002                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1003                assert_eq!(status, zx::sys::ZX_OK);
1004                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1005
1006                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1007                assert_eq!(status, zx::sys::ZX_OK);
1008                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1009
1010                let (status, name) = proxy.get_name().await.unwrap();
1011                assert_eq!(status, zx::sys::ZX_OK);
1012                assert_eq!(name.as_ref(), Some(&partition_info.name));
1013
1014                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1015                assert_eq!(metadata.name, name);
1016                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1017                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1018                assert_eq!(
1019                    metadata.start_block_offset,
1020                    Some(partition_info.block_range.as_ref().unwrap().start)
1021                );
1022                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1023                assert_eq!(metadata.flags, Some(partition_info.flags));
1024
1025                std::mem::drop(proxy);
1026            }
1027        );
1028    }
1029
1030    #[fuchsia::test]
1031    async fn test_attach_vmo() {
1032        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1033
1034        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1035        let koid = vmo.get_koid().unwrap();
1036
1037        futures::join!(
1038            async {
1039                let block_server = BlockServer::new(
1040                    BLOCK_SIZE,
1041                    Arc::new(MockInterface {
1042                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1043                            assert_eq!(vmo.get_koid().unwrap(), koid);
1044                            Box::pin(async { Ok(()) })
1045                        })),
1046                        ..MockInterface::default()
1047                    }),
1048                );
1049                block_server.handle_requests(stream).await.unwrap();
1050            },
1051            async move {
1052                let (session_proxy, server) = fidl::endpoints::create_proxy();
1053
1054                proxy.open_session(server).unwrap();
1055
1056                let vmo_id = session_proxy
1057                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1058                    .await
1059                    .unwrap()
1060                    .unwrap();
1061                assert_ne!(vmo_id.id, 0);
1062
1063                let mut fifo =
1064                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1065                let (mut reader, mut writer) = fifo.async_io();
1066
1067                // Keep attaching VMOs until we eventually hit the maximum.
1068                let mut count = 1;
1069                loop {
1070                    match session_proxy
1071                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1072                        .await
1073                        .unwrap()
1074                    {
1075                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1076                        Err(e) => {
1077                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1078                            break;
1079                        }
1080                    }
1081
1082                    // Only test every 10 to keep test time down.
1083                    if count % 10 == 0 {
1084                        writer
1085                            .write_entries(&BlockFifoRequest {
1086                                command: BlockFifoCommand {
1087                                    opcode: BlockOpcode::Read.into_primitive(),
1088                                    ..Default::default()
1089                                },
1090                                vmoid: vmo_id.id,
1091                                length: 1,
1092                                ..Default::default()
1093                            })
1094                            .await
1095                            .unwrap();
1096
1097                        let mut response = BlockFifoResponse::default();
1098                        reader.read_entries(&mut response).await.unwrap();
1099                        assert_eq!(response.status, zx::sys::ZX_OK);
1100                    }
1101
1102                    count += 1;
1103                }
1104
1105                assert_eq!(count, u16::MAX as u64);
1106
1107                // Detach the original VMO, and make sure we can then attach another one.
1108                writer
1109                    .write_entries(&BlockFifoRequest {
1110                        command: BlockFifoCommand {
1111                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1112                            ..Default::default()
1113                        },
1114                        vmoid: vmo_id.id,
1115                        ..Default::default()
1116                    })
1117                    .await
1118                    .unwrap();
1119
1120                let mut response = BlockFifoResponse::default();
1121                reader.read_entries(&mut response).await.unwrap();
1122                assert_eq!(response.status, zx::sys::ZX_OK);
1123
1124                let new_vmo_id = session_proxy
1125                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1126                    .await
1127                    .unwrap()
1128                    .unwrap();
1129                // It should reuse the same ID.
1130                assert_eq!(new_vmo_id.id, vmo_id.id);
1131
1132                std::mem::drop(proxy);
1133            }
1134        );
1135    }
1136
1137    #[fuchsia::test]
1138    async fn test_close() {
1139        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1140
1141        let mut server = std::pin::pin!(async {
1142            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1143            block_server.handle_requests(stream).await.unwrap();
1144        }
1145        .fuse());
1146
1147        let mut client = std::pin::pin!(async {
1148            let (session_proxy, server) = fidl::endpoints::create_proxy();
1149
1150            proxy.open_session(server).unwrap();
1151
1152            // Dropping the proxy should not cause the session to terminate because the session is
1153            // still live.
1154            std::mem::drop(proxy);
1155
1156            session_proxy.close().await.unwrap().unwrap();
1157
1158            // Keep the session alive.  Calling `close` should cause the server to terminate.
1159            let _: () = std::future::pending().await;
1160        }
1161        .fuse());
1162
1163        futures::select!(
1164            _ = server => {}
1165            _ = client => unreachable!(),
1166        );
1167    }
1168
1169    #[derive(Default)]
1170    struct IoMockInterface {
1171        do_checks: bool,
1172        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1173        return_errors: bool,
1174    }
1175
1176    #[derive(Debug)]
1177    enum ExpectedOp {
1178        Read(u64, u32, u64),
1179        Write(u64, u32, u64),
1180        Trim(u64, u32),
1181        Flush,
1182    }
1183
1184    impl super::async_interface::Interface for IoMockInterface {
1185        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1186            Ok(())
1187        }
1188
1189        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1190            Ok(Cow::Owned(test_device_info()))
1191        }
1192
1193        async fn read(
1194            &self,
1195            device_block_offset: u64,
1196            block_count: u32,
1197            _vmo: &Arc<zx::Vmo>,
1198            vmo_offset: u64,
1199            _trace_flow_id: Option<NonZero<u64>>,
1200        ) -> Result<(), zx::Status> {
1201            if self.return_errors {
1202                Err(zx::Status::INTERNAL)
1203            } else {
1204                if self.do_checks {
1205                    assert_matches!(
1206                        self.expected_op.lock().unwrap().take(),
1207                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1208                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1209                        "Read {device_block_offset} {block_count} {vmo_offset}"
1210                    );
1211                }
1212                Ok(())
1213            }
1214        }
1215
1216        async fn write(
1217            &self,
1218            device_block_offset: u64,
1219            block_count: u32,
1220            _vmo: &Arc<zx::Vmo>,
1221            vmo_offset: u64,
1222            _opts: WriteOptions,
1223            _trace_flow_id: Option<NonZero<u64>>,
1224        ) -> Result<(), zx::Status> {
1225            if self.return_errors {
1226                Err(zx::Status::NOT_SUPPORTED)
1227            } else {
1228                if self.do_checks {
1229                    assert_matches!(
1230                        self.expected_op.lock().unwrap().take(),
1231                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1232                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1233                        "Write {device_block_offset} {block_count} {vmo_offset}"
1234                    );
1235                }
1236                Ok(())
1237            }
1238        }
1239
1240        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1241            if self.return_errors {
1242                Err(zx::Status::NO_RESOURCES)
1243            } else {
1244                if self.do_checks {
1245                    assert_matches!(
1246                        self.expected_op.lock().unwrap().take(),
1247                        Some(ExpectedOp::Flush)
1248                    );
1249                }
1250                Ok(())
1251            }
1252        }
1253
1254        async fn trim(
1255            &self,
1256            device_block_offset: u64,
1257            block_count: u32,
1258            _trace_flow_id: Option<NonZero<u64>>,
1259        ) -> Result<(), zx::Status> {
1260            if self.return_errors {
1261                Err(zx::Status::NO_MEMORY)
1262            } else {
1263                if self.do_checks {
1264                    assert_matches!(
1265                        self.expected_op.lock().unwrap().take(),
1266                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1267                            block_count == b,
1268                        "Trim {device_block_offset} {block_count}"
1269                    );
1270                }
1271                Ok(())
1272            }
1273        }
1274    }
1275
1276    #[fuchsia::test]
1277    async fn test_io() {
1278        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1279
1280        let expected_op = Arc::new(Mutex::new(None));
1281        let expected_op_clone = expected_op.clone();
1282
1283        let server = async {
1284            let block_server = BlockServer::new(
1285                BLOCK_SIZE,
1286                Arc::new(IoMockInterface {
1287                    return_errors: false,
1288                    do_checks: true,
1289                    expected_op: expected_op_clone,
1290                }),
1291            );
1292            block_server.handle_requests(stream).await.unwrap();
1293        };
1294
1295        let client = async move {
1296            let (session_proxy, server) = fidl::endpoints::create_proxy();
1297
1298            proxy.open_session(server).unwrap();
1299
1300            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1301            let vmo_id = session_proxy
1302                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1303                .await
1304                .unwrap()
1305                .unwrap();
1306
1307            let mut fifo =
1308                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1309            let (mut reader, mut writer) = fifo.async_io();
1310
1311            // READ
1312            *expected_op.lock().unwrap() = Some(ExpectedOp::Read(1, 2, 3));
1313            writer
1314                .write_entries(&BlockFifoRequest {
1315                    command: BlockFifoCommand {
1316                        opcode: BlockOpcode::Read.into_primitive(),
1317                        ..Default::default()
1318                    },
1319                    vmoid: vmo_id.id,
1320                    dev_offset: 1,
1321                    length: 2,
1322                    vmo_offset: 3,
1323                    ..Default::default()
1324                })
1325                .await
1326                .unwrap();
1327
1328            let mut response = BlockFifoResponse::default();
1329            reader.read_entries(&mut response).await.unwrap();
1330            assert_eq!(response.status, zx::sys::ZX_OK);
1331
1332            // WRITE
1333            *expected_op.lock().unwrap() = Some(ExpectedOp::Write(4, 5, 6));
1334            writer
1335                .write_entries(&BlockFifoRequest {
1336                    command: BlockFifoCommand {
1337                        opcode: BlockOpcode::Write.into_primitive(),
1338                        ..Default::default()
1339                    },
1340                    vmoid: vmo_id.id,
1341                    dev_offset: 4,
1342                    length: 5,
1343                    vmo_offset: 6,
1344                    ..Default::default()
1345                })
1346                .await
1347                .unwrap();
1348
1349            let mut response = BlockFifoResponse::default();
1350            reader.read_entries(&mut response).await.unwrap();
1351            assert_eq!(response.status, zx::sys::ZX_OK);
1352
1353            // FLUSH
1354            *expected_op.lock().unwrap() = Some(ExpectedOp::Flush);
1355            writer
1356                .write_entries(&BlockFifoRequest {
1357                    command: BlockFifoCommand {
1358                        opcode: BlockOpcode::Flush.into_primitive(),
1359                        ..Default::default()
1360                    },
1361                    ..Default::default()
1362                })
1363                .await
1364                .unwrap();
1365
1366            reader.read_entries(&mut response).await.unwrap();
1367            assert_eq!(response.status, zx::sys::ZX_OK);
1368
1369            // TRIM
1370            *expected_op.lock().unwrap() = Some(ExpectedOp::Trim(7, 8));
1371            writer
1372                .write_entries(&BlockFifoRequest {
1373                    command: BlockFifoCommand {
1374                        opcode: BlockOpcode::Trim.into_primitive(),
1375                        ..Default::default()
1376                    },
1377                    dev_offset: 7,
1378                    length: 8,
1379                    ..Default::default()
1380                })
1381                .await
1382                .unwrap();
1383
1384            reader.read_entries(&mut response).await.unwrap();
1385            assert_eq!(response.status, zx::sys::ZX_OK);
1386
1387            std::mem::drop(proxy);
1388        };
1389
1390        futures::join!(server, client);
1391    }
1392
1393    #[fuchsia::test]
1394    async fn test_io_errors() {
1395        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1396
1397        futures::join!(
1398            async {
1399                let block_server = BlockServer::new(
1400                    BLOCK_SIZE,
1401                    Arc::new(IoMockInterface {
1402                        return_errors: true,
1403                        do_checks: false,
1404                        expected_op: Arc::new(Mutex::new(None)),
1405                    }),
1406                );
1407                block_server.handle_requests(stream).await.unwrap();
1408            },
1409            async move {
1410                let (session_proxy, server) = fidl::endpoints::create_proxy();
1411
1412                proxy.open_session(server).unwrap();
1413
1414                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1415                let vmo_id = session_proxy
1416                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1417                    .await
1418                    .unwrap()
1419                    .unwrap();
1420
1421                let mut fifo =
1422                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1423                let (mut reader, mut writer) = fifo.async_io();
1424
1425                // READ
1426                writer
1427                    .write_entries(&BlockFifoRequest {
1428                        command: BlockFifoCommand {
1429                            opcode: BlockOpcode::Read.into_primitive(),
1430                            ..Default::default()
1431                        },
1432                        vmoid: vmo_id.id,
1433                        length: 1,
1434                        reqid: 1,
1435                        ..Default::default()
1436                    })
1437                    .await
1438                    .unwrap();
1439
1440                let mut response = BlockFifoResponse::default();
1441                reader.read_entries(&mut response).await.unwrap();
1442                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1443
1444                // WRITE
1445                writer
1446                    .write_entries(&BlockFifoRequest {
1447                        command: BlockFifoCommand {
1448                            opcode: BlockOpcode::Write.into_primitive(),
1449                            ..Default::default()
1450                        },
1451                        vmoid: vmo_id.id,
1452                        length: 1,
1453                        reqid: 2,
1454                        ..Default::default()
1455                    })
1456                    .await
1457                    .unwrap();
1458
1459                reader.read_entries(&mut response).await.unwrap();
1460                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1461
1462                // FLUSH
1463                writer
1464                    .write_entries(&BlockFifoRequest {
1465                        command: BlockFifoCommand {
1466                            opcode: BlockOpcode::Flush.into_primitive(),
1467                            ..Default::default()
1468                        },
1469                        reqid: 3,
1470                        ..Default::default()
1471                    })
1472                    .await
1473                    .unwrap();
1474
1475                reader.read_entries(&mut response).await.unwrap();
1476                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1477
1478                // TRIM
1479                writer
1480                    .write_entries(&BlockFifoRequest {
1481                        command: BlockFifoCommand {
1482                            opcode: BlockOpcode::Trim.into_primitive(),
1483                            ..Default::default()
1484                        },
1485                        reqid: 4,
1486                        length: 1,
1487                        ..Default::default()
1488                    })
1489                    .await
1490                    .unwrap();
1491
1492                reader.read_entries(&mut response).await.unwrap();
1493                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1494
1495                std::mem::drop(proxy);
1496            }
1497        );
1498    }
1499
1500    #[fuchsia::test]
1501    async fn test_invalid_args() {
1502        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1503
1504        futures::join!(
1505            async {
1506                let block_server = BlockServer::new(
1507                    BLOCK_SIZE,
1508                    Arc::new(IoMockInterface {
1509                        return_errors: false,
1510                        do_checks: false,
1511                        expected_op: Arc::new(Mutex::new(None)),
1512                    }),
1513                );
1514                block_server.handle_requests(stream).await.unwrap();
1515            },
1516            async move {
1517                let (session_proxy, server) = fidl::endpoints::create_proxy();
1518
1519                proxy.open_session(server).unwrap();
1520
1521                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1522                let vmo_id = session_proxy
1523                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1524                    .await
1525                    .unwrap()
1526                    .unwrap();
1527
1528                let mut fifo =
1529                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1530
1531                async fn test(
1532                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1533                    request: BlockFifoRequest,
1534                ) -> Result<(), zx::Status> {
1535                    let (mut reader, mut writer) = fifo.async_io();
1536                    writer.write_entries(&request).await.unwrap();
1537                    let mut response = BlockFifoResponse::default();
1538                    reader.read_entries(&mut response).await.unwrap();
1539                    zx::Status::ok(response.status)
1540                }
1541
1542                // READ
1543
1544                let good_read_request = || BlockFifoRequest {
1545                    command: BlockFifoCommand {
1546                        opcode: BlockOpcode::Read.into_primitive(),
1547                        ..Default::default()
1548                    },
1549                    vmoid: vmo_id.id,
1550                    ..Default::default()
1551                };
1552
1553                assert_eq!(
1554                    test(
1555                        &mut fifo,
1556                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1557                    )
1558                    .await,
1559                    Err(zx::Status::INVALID_ARGS)
1560                );
1561
1562                assert_eq!(
1563                    test(
1564                        &mut fifo,
1565                        BlockFifoRequest {
1566                            vmo_offset: 0xffff_ffff_ffff_ffff,
1567                            ..good_read_request()
1568                        }
1569                    )
1570                    .await,
1571                    Err(zx::Status::INVALID_ARGS)
1572                );
1573
1574                // WRITE
1575
1576                let good_write_request = || BlockFifoRequest {
1577                    command: BlockFifoCommand {
1578                        opcode: BlockOpcode::Write.into_primitive(),
1579                        ..Default::default()
1580                    },
1581                    vmoid: vmo_id.id,
1582                    ..Default::default()
1583                };
1584
1585                assert_eq!(
1586                    test(
1587                        &mut fifo,
1588                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1589                    )
1590                    .await,
1591                    Err(zx::Status::INVALID_ARGS)
1592                );
1593
1594                assert_eq!(
1595                    test(
1596                        &mut fifo,
1597                        BlockFifoRequest {
1598                            vmo_offset: 0xffff_ffff_ffff_ffff,
1599                            ..good_write_request()
1600                        }
1601                    )
1602                    .await,
1603                    Err(zx::Status::INVALID_ARGS)
1604                );
1605
1606                // CLOSE VMO
1607
1608                assert_eq!(
1609                    test(
1610                        &mut fifo,
1611                        BlockFifoRequest {
1612                            command: BlockFifoCommand {
1613                                opcode: BlockOpcode::CloseVmo.into_primitive(),
1614                                ..Default::default()
1615                            },
1616                            vmoid: vmo_id.id + 1,
1617                            ..Default::default()
1618                        }
1619                    )
1620                    .await,
1621                    Err(zx::Status::IO)
1622                );
1623
1624                std::mem::drop(proxy);
1625            }
1626        );
1627    }
1628
1629    #[fuchsia::test]
1630    async fn test_concurrent_requests() {
1631        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1632
1633        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1634        let waiting_readers_clone = waiting_readers.clone();
1635
1636        futures::join!(
1637            async move {
1638                let block_server = BlockServer::new(
1639                    BLOCK_SIZE,
1640                    Arc::new(MockInterface {
1641                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1642                            let (tx, rx) = oneshot::channel();
1643                            waiting_readers_clone
1644                                .lock()
1645                                .unwrap()
1646                                .push((dev_block_offset as u32, tx));
1647                            Box::pin(async move {
1648                                let _ = rx.await;
1649                                Ok(())
1650                            })
1651                        })),
1652                    }),
1653                );
1654                block_server.handle_requests(stream).await.unwrap();
1655            },
1656            async move {
1657                let (session_proxy, server) = fidl::endpoints::create_proxy();
1658
1659                proxy.open_session(server).unwrap();
1660
1661                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1662                let vmo_id = session_proxy
1663                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1664                    .await
1665                    .unwrap()
1666                    .unwrap();
1667
1668                let mut fifo =
1669                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1670                let (mut reader, mut writer) = fifo.async_io();
1671
1672                writer
1673                    .write_entries(&BlockFifoRequest {
1674                        command: BlockFifoCommand {
1675                            opcode: BlockOpcode::Read.into_primitive(),
1676                            ..Default::default()
1677                        },
1678                        reqid: 1,
1679                        dev_offset: 1, // Intentionally use the same as `reqid`.
1680                        vmoid: vmo_id.id,
1681                        length: 1,
1682                        ..Default::default()
1683                    })
1684                    .await
1685                    .unwrap();
1686
1687                writer
1688                    .write_entries(&BlockFifoRequest {
1689                        command: BlockFifoCommand {
1690                            opcode: BlockOpcode::Read.into_primitive(),
1691                            ..Default::default()
1692                        },
1693                        reqid: 2,
1694                        dev_offset: 2,
1695                        vmoid: vmo_id.id,
1696                        length: 1,
1697                        ..Default::default()
1698                    })
1699                    .await
1700                    .unwrap();
1701
1702                // Wait till both those entries are pending.
1703                poll_fn(|cx: &mut Context<'_>| {
1704                    if waiting_readers.lock().unwrap().len() == 2 {
1705                        Poll::Ready(())
1706                    } else {
1707                        // Yield to the executor.
1708                        cx.waker().wake_by_ref();
1709                        Poll::Pending
1710                    }
1711                })
1712                .await;
1713
1714                let mut response = BlockFifoResponse::default();
1715                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1716
1717                let (id, tx) = waiting_readers.lock().unwrap().pop().unwrap();
1718                tx.send(()).unwrap();
1719
1720                reader.read_entries(&mut response).await.unwrap();
1721                assert_eq!(response.status, zx::sys::ZX_OK);
1722                assert_eq!(response.reqid, id);
1723
1724                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1725
1726                let (id, tx) = waiting_readers.lock().unwrap().pop().unwrap();
1727                tx.send(()).unwrap();
1728
1729                reader.read_entries(&mut response).await.unwrap();
1730                assert_eq!(response.status, zx::sys::ZX_OK);
1731                assert_eq!(response.reqid, id);
1732            }
1733        );
1734    }
1735
1736    #[fuchsia::test]
1737    async fn test_groups() {
1738        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1739
1740        futures::join!(
1741            async move {
1742                let block_server = BlockServer::new(
1743                    BLOCK_SIZE,
1744                    Arc::new(MockInterface {
1745                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1746                    }),
1747                );
1748                block_server.handle_requests(stream).await.unwrap();
1749            },
1750            async move {
1751                let (session_proxy, server) = fidl::endpoints::create_proxy();
1752
1753                proxy.open_session(server).unwrap();
1754
1755                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1756                let vmo_id = session_proxy
1757                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1758                    .await
1759                    .unwrap()
1760                    .unwrap();
1761
1762                let mut fifo =
1763                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1764                let (mut reader, mut writer) = fifo.async_io();
1765
1766                writer
1767                    .write_entries(&BlockFifoRequest {
1768                        command: BlockFifoCommand {
1769                            opcode: BlockOpcode::Read.into_primitive(),
1770                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1771                            ..Default::default()
1772                        },
1773                        group: 1,
1774                        vmoid: vmo_id.id,
1775                        length: 1,
1776                        ..Default::default()
1777                    })
1778                    .await
1779                    .unwrap();
1780
1781                writer
1782                    .write_entries(&BlockFifoRequest {
1783                        command: BlockFifoCommand {
1784                            opcode: BlockOpcode::Read.into_primitive(),
1785                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1786                            ..Default::default()
1787                        },
1788                        reqid: 2,
1789                        group: 1,
1790                        vmoid: vmo_id.id,
1791                        length: 1,
1792                        ..Default::default()
1793                    })
1794                    .await
1795                    .unwrap();
1796
1797                let mut response = BlockFifoResponse::default();
1798                reader.read_entries(&mut response).await.unwrap();
1799                assert_eq!(response.status, zx::sys::ZX_OK);
1800                assert_eq!(response.reqid, 2);
1801                assert_eq!(response.group, 1);
1802            }
1803        );
1804    }
1805
1806    #[fuchsia::test]
1807    async fn test_group_error() {
1808        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1809
1810        let counter = Arc::new(AtomicU64::new(0));
1811        let counter_clone = counter.clone();
1812
1813        futures::join!(
1814            async move {
1815                let block_server = BlockServer::new(
1816                    BLOCK_SIZE,
1817                    Arc::new(MockInterface {
1818                        read_hook: Some(Box::new(move |_, _, _, _| {
1819                            counter_clone.fetch_add(1, Ordering::Relaxed);
1820                            Box::pin(async { Err(zx::Status::BAD_STATE) })
1821                        })),
1822                    }),
1823                );
1824                block_server.handle_requests(stream).await.unwrap();
1825            },
1826            async move {
1827                let (session_proxy, server) = fidl::endpoints::create_proxy();
1828
1829                proxy.open_session(server).unwrap();
1830
1831                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1832                let vmo_id = session_proxy
1833                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1834                    .await
1835                    .unwrap()
1836                    .unwrap();
1837
1838                let mut fifo =
1839                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1840                let (mut reader, mut writer) = fifo.async_io();
1841
1842                writer
1843                    .write_entries(&BlockFifoRequest {
1844                        command: BlockFifoCommand {
1845                            opcode: BlockOpcode::Read.into_primitive(),
1846                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1847                            ..Default::default()
1848                        },
1849                        group: 1,
1850                        vmoid: vmo_id.id,
1851                        length: 1,
1852                        ..Default::default()
1853                    })
1854                    .await
1855                    .unwrap();
1856
1857                // Wait until processed.
1858                poll_fn(|cx: &mut Context<'_>| {
1859                    if counter.load(Ordering::Relaxed) == 1 {
1860                        Poll::Ready(())
1861                    } else {
1862                        // Yield to the executor.
1863                        cx.waker().wake_by_ref();
1864                        Poll::Pending
1865                    }
1866                })
1867                .await;
1868
1869                let mut response = BlockFifoResponse::default();
1870                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1871
1872                writer
1873                    .write_entries(&BlockFifoRequest {
1874                        command: BlockFifoCommand {
1875                            opcode: BlockOpcode::Read.into_primitive(),
1876                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1877                            ..Default::default()
1878                        },
1879                        group: 1,
1880                        vmoid: vmo_id.id,
1881                        length: 1,
1882                        ..Default::default()
1883                    })
1884                    .await
1885                    .unwrap();
1886
1887                writer
1888                    .write_entries(&BlockFifoRequest {
1889                        command: BlockFifoCommand {
1890                            opcode: BlockOpcode::Read.into_primitive(),
1891                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1892                            ..Default::default()
1893                        },
1894                        reqid: 2,
1895                        group: 1,
1896                        vmoid: vmo_id.id,
1897                        length: 1,
1898                        ..Default::default()
1899                    })
1900                    .await
1901                    .unwrap();
1902
1903                reader.read_entries(&mut response).await.unwrap();
1904                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1905                assert_eq!(response.reqid, 2);
1906                assert_eq!(response.group, 1);
1907
1908                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1909
1910                // Only the first request should have been processed.
1911                assert_eq!(counter.load(Ordering::Relaxed), 1);
1912            }
1913        );
1914    }
1915
1916    #[fuchsia::test]
1917    async fn test_group_with_two_lasts() {
1918        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1919
1920        let (tx, rx) = oneshot::channel();
1921
1922        futures::join!(
1923            async move {
1924                let rx = Mutex::new(Some(rx));
1925                let block_server = BlockServer::new(
1926                    BLOCK_SIZE,
1927                    Arc::new(MockInterface {
1928                        read_hook: Some(Box::new(move |_, _, _, _| {
1929                            let rx = rx.lock().unwrap().take().unwrap();
1930                            Box::pin(async {
1931                                let _ = rx.await;
1932                                Ok(())
1933                            })
1934                        })),
1935                    }),
1936                );
1937                block_server.handle_requests(stream).await.unwrap();
1938            },
1939            async move {
1940                let (session_proxy, server) = fidl::endpoints::create_proxy();
1941
1942                proxy.open_session(server).unwrap();
1943
1944                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1945                let vmo_id = session_proxy
1946                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1947                    .await
1948                    .unwrap()
1949                    .unwrap();
1950
1951                let mut fifo =
1952                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1953                let (mut reader, mut writer) = fifo.async_io();
1954
1955                writer
1956                    .write_entries(&BlockFifoRequest {
1957                        command: BlockFifoCommand {
1958                            opcode: BlockOpcode::Read.into_primitive(),
1959                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1960                            ..Default::default()
1961                        },
1962                        reqid: 1,
1963                        group: 1,
1964                        vmoid: vmo_id.id,
1965                        length: 1,
1966                        ..Default::default()
1967                    })
1968                    .await
1969                    .unwrap();
1970
1971                writer
1972                    .write_entries(&BlockFifoRequest {
1973                        command: BlockFifoCommand {
1974                            opcode: BlockOpcode::Read.into_primitive(),
1975                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1976                            ..Default::default()
1977                        },
1978                        reqid: 2,
1979                        group: 1,
1980                        vmoid: vmo_id.id,
1981                        length: 1,
1982                        ..Default::default()
1983                    })
1984                    .await
1985                    .unwrap();
1986
1987                // Send an independent request to flush through the fifo.
1988                writer
1989                    .write_entries(&BlockFifoRequest {
1990                        command: BlockFifoCommand {
1991                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1992                            ..Default::default()
1993                        },
1994                        reqid: 3,
1995                        vmoid: vmo_id.id,
1996                        ..Default::default()
1997                    })
1998                    .await
1999                    .unwrap();
2000
2001                // It should succeed.
2002                let mut response = BlockFifoResponse::default();
2003                reader.read_entries(&mut response).await.unwrap();
2004                assert_eq!(response.status, zx::sys::ZX_OK);
2005                assert_eq!(response.reqid, 3);
2006
2007                // Now release the original request.
2008                tx.send(()).unwrap();
2009
2010                // The response should be for the first message tagged as last, and it should be
2011                // an error because we sent two messages with the LAST marker.
2012                let mut response = BlockFifoResponse::default();
2013                reader.read_entries(&mut response).await.unwrap();
2014                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2015                assert_eq!(response.reqid, 1);
2016                assert_eq!(response.group, 1);
2017            }
2018        );
2019    }
2020
2021    #[fuchsia::test(allow_stalls = false)]
2022    async fn test_requests_dont_block_sessions() {
2023        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2024
2025        let (tx, rx) = oneshot::channel();
2026
2027        fasync::Task::local(async move {
2028            let rx = Mutex::new(Some(rx));
2029            let block_server = BlockServer::new(
2030                BLOCK_SIZE,
2031                Arc::new(MockInterface {
2032                    read_hook: Some(Box::new(move |_, _, _, _| {
2033                        let rx = rx.lock().unwrap().take().unwrap();
2034                        Box::pin(async {
2035                            let _ = rx.await;
2036                            Ok(())
2037                        })
2038                    })),
2039                }),
2040            );
2041            block_server.handle_requests(stream).await.unwrap();
2042        })
2043        .detach();
2044
2045        let mut fut = pin!(async {
2046            let (session_proxy, server) = fidl::endpoints::create_proxy();
2047
2048            proxy.open_session(server).unwrap();
2049
2050            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2051            let vmo_id = session_proxy
2052                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2053                .await
2054                .unwrap()
2055                .unwrap();
2056
2057            let mut fifo =
2058                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2059            let (mut reader, mut writer) = fifo.async_io();
2060
2061            writer
2062                .write_entries(&BlockFifoRequest {
2063                    command: BlockFifoCommand {
2064                        opcode: BlockOpcode::Read.into_primitive(),
2065                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2066                        ..Default::default()
2067                    },
2068                    reqid: 1,
2069                    group: 1,
2070                    vmoid: vmo_id.id,
2071                    length: 1,
2072                    ..Default::default()
2073                })
2074                .await
2075                .unwrap();
2076
2077            let mut response = BlockFifoResponse::default();
2078            reader.read_entries(&mut response).await.unwrap();
2079            assert_eq!(response.status, zx::sys::ZX_OK);
2080        });
2081
2082        // The response won't come back until we send on `tx`.
2083        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2084
2085        let mut fut2 = pin!(proxy.get_volume_info());
2086
2087        // get_volume_info is set up to stall forever.
2088        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2089
2090        // If we now free up the first future, it should resolve; the stalled call to
2091        // get_volume_info should not block the fifo response.
2092        let _ = tx.send(());
2093
2094        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2095    }
2096
2097    #[fuchsia::test]
2098    async fn test_request_flow_control() {
2099        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2100
2101        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2102        // server will block until that happens.
2103        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2104        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2105        let event_clone = event.clone();
2106        futures::join!(
2107            async move {
2108                let block_server = BlockServer::new(
2109                    BLOCK_SIZE,
2110                    Arc::new(MockInterface {
2111                        read_hook: Some(Box::new(move |_, _, _, _| {
2112                            let event_clone = event_clone.clone();
2113                            Box::pin(async move {
2114                                if !event_clone.1.load(Ordering::SeqCst) {
2115                                    event_clone.0.listen().await;
2116                                }
2117                                Ok(())
2118                            })
2119                        })),
2120                    }),
2121                );
2122                block_server.handle_requests(stream).await.unwrap();
2123            },
2124            async move {
2125                let (session_proxy, server) = fidl::endpoints::create_proxy();
2126
2127                proxy.open_session(server).unwrap();
2128
2129                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2130                let vmo_id = session_proxy
2131                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2132                    .await
2133                    .unwrap()
2134                    .unwrap();
2135
2136                let mut fifo =
2137                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2138                let (mut reader, mut writer) = fifo.async_io();
2139
2140                for i in 0..MAX_REQUESTS {
2141                    writer
2142                        .write_entries(&BlockFifoRequest {
2143                            command: BlockFifoCommand {
2144                                opcode: BlockOpcode::Read.into_primitive(),
2145                                ..Default::default()
2146                            },
2147                            reqid: (i + 1) as u32,
2148                            dev_offset: i,
2149                            vmoid: vmo_id.id,
2150                            length: 1,
2151                            ..Default::default()
2152                        })
2153                        .await
2154                        .unwrap();
2155                }
2156                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2157                    command: BlockFifoCommand {
2158                        opcode: BlockOpcode::Read.into_primitive(),
2159                        ..Default::default()
2160                    },
2161                    reqid: u32::MAX,
2162                    dev_offset: MAX_REQUESTS,
2163                    vmoid: vmo_id.id,
2164                    length: 1,
2165                    ..Default::default()
2166                })))
2167                .is_pending());
2168                // OK, let the server start to process.
2169                event.1.store(true, Ordering::SeqCst);
2170                event.0.notify(usize::MAX);
2171                // For each entry we read, make sure we can write a new one in.
2172                let mut finished_reqids = vec![];
2173                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2174                    let mut response = BlockFifoResponse::default();
2175                    reader.read_entries(&mut response).await.unwrap();
2176                    assert_eq!(response.status, zx::sys::ZX_OK);
2177                    finished_reqids.push(response.reqid);
2178                    writer
2179                        .write_entries(&BlockFifoRequest {
2180                            command: BlockFifoCommand {
2181                                opcode: BlockOpcode::Read.into_primitive(),
2182                                ..Default::default()
2183                            },
2184                            reqid: (i + 1) as u32,
2185                            dev_offset: i,
2186                            vmoid: vmo_id.id,
2187                            length: 1,
2188                            ..Default::default()
2189                        })
2190                        .await
2191                        .unwrap();
2192                }
2193                let mut response = BlockFifoResponse::default();
2194                for _ in 0..MAX_REQUESTS {
2195                    reader.read_entries(&mut response).await.unwrap();
2196                    assert_eq!(response.status, zx::sys::ZX_OK);
2197                    finished_reqids.push(response.reqid);
2198                }
2199                // Verify that we got a response for each request.  Note that we can't assume FIFO
2200                // ordering.
2201                finished_reqids.sort();
2202                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2203                let mut i = 1;
2204                for reqid in finished_reqids {
2205                    assert_eq!(reqid, i);
2206                    i += 1;
2207                }
2208            }
2209        );
2210    }
2211
2212    #[fuchsia::test]
2213    async fn test_passthrough_io_with_fixed_map() {
2214        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2215
2216        let expected_op = Arc::new(Mutex::new(None));
2217        let expected_op_clone = expected_op.clone();
2218        futures::join!(
2219            async {
2220                let block_server = BlockServer::new(
2221                    BLOCK_SIZE,
2222                    Arc::new(IoMockInterface {
2223                        return_errors: false,
2224                        do_checks: true,
2225                        expected_op: expected_op_clone,
2226                    }),
2227                );
2228                block_server.handle_requests(stream).await.unwrap();
2229            },
2230            async move {
2231                let (session_proxy, server) = fidl::endpoints::create_proxy();
2232
2233                let mappings = [fblock::BlockOffsetMapping {
2234                    source_block_offset: 0,
2235                    target_block_offset: 10,
2236                    length: 20,
2237                }];
2238                proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2239
2240                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2241                let vmo_id = session_proxy
2242                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2243                    .await
2244                    .unwrap()
2245                    .unwrap();
2246
2247                let mut fifo =
2248                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2249                let (mut reader, mut writer) = fifo.async_io();
2250
2251                // READ
2252                *expected_op.lock().unwrap() = Some(ExpectedOp::Read(11, 2, 3));
2253                writer
2254                    .write_entries(&BlockFifoRequest {
2255                        command: BlockFifoCommand {
2256                            opcode: BlockOpcode::Read.into_primitive(),
2257                            ..Default::default()
2258                        },
2259                        vmoid: vmo_id.id,
2260                        dev_offset: 1,
2261                        length: 2,
2262                        vmo_offset: 3,
2263                        ..Default::default()
2264                    })
2265                    .await
2266                    .unwrap();
2267
2268                let mut response = BlockFifoResponse::default();
2269                reader.read_entries(&mut response).await.unwrap();
2270                assert_eq!(response.status, zx::sys::ZX_OK);
2271
2272                // WRITE
2273                *expected_op.lock().unwrap() = Some(ExpectedOp::Write(14, 5, 6));
2274                writer
2275                    .write_entries(&BlockFifoRequest {
2276                        command: BlockFifoCommand {
2277                            opcode: BlockOpcode::Write.into_primitive(),
2278                            ..Default::default()
2279                        },
2280                        vmoid: vmo_id.id,
2281                        dev_offset: 4,
2282                        length: 5,
2283                        vmo_offset: 6,
2284                        ..Default::default()
2285                    })
2286                    .await
2287                    .unwrap();
2288
2289                reader.read_entries(&mut response).await.unwrap();
2290                assert_eq!(response.status, zx::sys::ZX_OK);
2291
2292                // FLUSH
2293                *expected_op.lock().unwrap() = Some(ExpectedOp::Flush);
2294                writer
2295                    .write_entries(&BlockFifoRequest {
2296                        command: BlockFifoCommand {
2297                            opcode: BlockOpcode::Flush.into_primitive(),
2298                            ..Default::default()
2299                        },
2300                        ..Default::default()
2301                    })
2302                    .await
2303                    .unwrap();
2304
2305                reader.read_entries(&mut response).await.unwrap();
2306                assert_eq!(response.status, zx::sys::ZX_OK);
2307
2308                // TRIM
2309                *expected_op.lock().unwrap() = Some(ExpectedOp::Trim(17, 3));
2310                writer
2311                    .write_entries(&BlockFifoRequest {
2312                        command: BlockFifoCommand {
2313                            opcode: BlockOpcode::Trim.into_primitive(),
2314                            ..Default::default()
2315                        },
2316                        dev_offset: 7,
2317                        length: 3,
2318                        ..Default::default()
2319                    })
2320                    .await
2321                    .unwrap();
2322
2323                reader.read_entries(&mut response).await.unwrap();
2324                assert_eq!(response.status, zx::sys::ZX_OK);
2325
2326                // READ past window
2327                *expected_op.lock().unwrap() = None;
2328                writer
2329                    .write_entries(&BlockFifoRequest {
2330                        command: BlockFifoCommand {
2331                            opcode: BlockOpcode::Read.into_primitive(),
2332                            ..Default::default()
2333                        },
2334                        vmoid: vmo_id.id,
2335                        dev_offset: 19,
2336                        length: 2,
2337                        vmo_offset: 3,
2338                        ..Default::default()
2339                    })
2340                    .await
2341                    .unwrap();
2342
2343                reader.read_entries(&mut response).await.unwrap();
2344                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2345
2346                std::mem::drop(proxy);
2347            }
2348        );
2349    }
2350}