block_server/
lib.rs

1// Copyright 2024 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4use anyhow::{anyhow, Error};
5use block_protocol::{BlockFifoRequest, BlockFifoResponse};
6use fidl_fuchsia_hardware_block::MAX_TRANSFER_UNBOUNDED;
7use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
8use fuchsia_sync::Mutex;
9use futures::{Future, FutureExt as _, TryStreamExt as _};
10use std::borrow::Cow;
11use std::collections::btree_map::Entry;
12use std::collections::BTreeMap;
13use std::num::NonZero;
14use std::ops::Range;
15use std::sync::atomic::AtomicU64;
16use std::sync::Arc;
17use zx::HandleBased;
18use {
19    fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_partition as fpartition,
20    fidl_fuchsia_hardware_block_volume as fvolume, fuchsia_async as fasync,
21};
22
23pub mod async_interface;
24pub mod c_interface;
25
26#[derive(Clone)]
27pub enum DeviceInfo {
28    Block(BlockInfo),
29    Partition(PartitionInfo),
30}
31
32impl DeviceInfo {
33    fn max_transfer_size(&self, block_size: u32) -> u32 {
34        let max_blocks = match self {
35            Self::Block(BlockInfo { max_transfer_blocks, .. }) => max_transfer_blocks,
36            Self::Partition(PartitionInfo { max_transfer_blocks, .. }) => max_transfer_blocks,
37        };
38        if let Some(max_blocks) = max_blocks {
39            max_blocks.get() * block_size
40        } else {
41            MAX_TRANSFER_UNBOUNDED
42        }
43    }
44}
45
46/// Information associated with non-partition block devices.
47#[derive(Clone)]
48pub struct BlockInfo {
49    pub device_flags: fblock::Flag,
50    pub block_count: u64,
51    pub max_transfer_blocks: Option<NonZero<u32>>,
52}
53
54/// Information associated with a block device that is also a partition.
55#[derive(Clone)]
56pub struct PartitionInfo {
57    /// The device flags reported by the underlying device.
58    pub device_flags: fblock::Flag,
59    pub max_transfer_blocks: Option<NonZero<u32>>,
60    /// If `block_range` is None, the partition is a volume and may not be contiguous.
61    /// In this case, the server will use the `get_volume_info` method to get the count of assigned
62    /// slices and use that (along with the slice and block sizes) to determine the block count.
63    pub block_range: Option<Range<u64>>,
64    pub type_guid: [u8; 16],
65    pub instance_guid: [u8; 16],
66    pub name: String,
67    pub flags: u64,
68}
69
70// Multiple Block I/O request may be sent as a group.
71// Notes:
72// - the group is identified by the group id in the request
73// - if using groups, a response will not be sent unless `BlockIoFlag::GROUP_LAST`
74//   flag is set.
75// - when processing a request of a group fails, subsequent requests of that
76//   group will not be processed.
77//
78// Refer to sdk/fidl/fuchsia.hardware.block.driver/block.fidl for details.
79//
80// FifoMessageGroup keeps track of the relevant BlockFifoResponse field for
81// a group requests. Only `status` and `count` needs to be updated.
82struct FifoMessageGroup {
83    status: zx::Status,
84    count: u32,
85    req_id: Option<u32>,
86}
87
88impl FifoMessageGroup {
89    fn new() -> Self {
90        FifoMessageGroup { status: zx::Status::OK, count: 0, req_id: None }
91    }
92}
93
94#[derive(Default)]
95struct FifoMessageGroups(Mutex<BTreeMap<u16, FifoMessageGroup>>);
96
97// Keeps track of all the group requests that are currently being processed
98impl FifoMessageGroups {
99    /// Completes a request and returns a response to be sent if it's the last outstanding request
100    /// for this group.
101    fn complete(&self, group_id: u16, status: zx::Status) -> Option<BlockFifoResponse> {
102        let mut map = self.0.lock();
103        let Entry::Occupied(mut o) = map.entry(group_id) else { unreachable!() };
104        let group = o.get_mut();
105        if group.count == 1 {
106            if let Some(reqid) = group.req_id {
107                let status =
108                    if group.status != zx::Status::OK { group.status } else { status }.into_raw();
109
110                o.remove();
111
112                return Some(BlockFifoResponse {
113                    status,
114                    reqid,
115                    group: group_id,
116                    ..Default::default()
117                });
118            }
119        }
120
121        group.count = group.count.checked_sub(1).unwrap();
122        if status != zx::Status::OK && group.status == zx::Status::OK {
123            group.status = status
124        }
125        None
126    }
127}
128
129/// BlockServer is an implementation of fuchsia.hardware.block.partition.Partition.
130/// cbindgen:no-export
131pub struct BlockServer<SM> {
132    block_size: u32,
133    session_manager: Arc<SM>,
134}
135
136/// A single entry in `[OffsetMap]`.
137pub struct BlockOffsetMapping {
138    source_block_offset: u64,
139    target_block_offset: u64,
140    length: u64,
141}
142
143impl std::convert::TryFrom<fblock::BlockOffsetMapping> for BlockOffsetMapping {
144    type Error = zx::Status;
145
146    fn try_from(wire: fblock::BlockOffsetMapping) -> Result<Self, Self::Error> {
147        wire.source_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
148        wire.target_block_offset.checked_add(wire.length).ok_or(zx::Status::INVALID_ARGS)?;
149        Ok(Self {
150            source_block_offset: wire.source_block_offset,
151            target_block_offset: wire.target_block_offset,
152            length: wire.length,
153        })
154    }
155}
156
157/// Remaps the offset of block requests based on an internal map.
158/// TODO(https://fxbug.dev/402515764): For now, this just supports a single entry in the map, which
159/// is all that is required for GPT partitions.  If we want to support this for FVM, we will need
160/// to support multiple entries, which requires changing the block server to support request
161/// splitting.
162pub struct OffsetMap {
163    mapping: BlockOffsetMapping,
164}
165
166impl OffsetMap {
167    pub fn new(mapping: BlockOffsetMapping) -> Self {
168        Self { mapping }
169    }
170
171    /// Adjusts the requested range, returning the new dev_offset.
172    /// Returns false if the request would exceed the range known to OffsetMap.
173    pub fn adjust_request(&self, dev_offset: u64, length: u64) -> Option<u64> {
174        if length == 0 {
175            return None;
176        }
177        let end = dev_offset.checked_add(length)?;
178        if self.mapping.source_block_offset > dev_offset
179            || end > self.mapping.source_block_offset + self.mapping.length
180        {
181            return None;
182        }
183        let delta = dev_offset - self.mapping.source_block_offset;
184        Some(self.mapping.target_block_offset + delta)
185    }
186}
187
188// Methods take Arc<Self> rather than &self because of
189// https://github.com/rust-lang/rust/issues/42940.
190pub trait SessionManager: 'static {
191    fn on_attach_vmo(
192        self: Arc<Self>,
193        vmo: &Arc<zx::Vmo>,
194    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
195
196    /// Creates a new session to handle `stream`.
197    /// The returned future should run until the session completes, for example when the client end
198    /// closes.
199    /// If `offset_map` is set, it will be used to remap the dev_offset of FIFO requests.
200    fn open_session(
201        self: Arc<Self>,
202        stream: fblock::SessionRequestStream,
203        offset_map: Option<OffsetMap>,
204        block_size: u32,
205    ) -> impl Future<Output = Result<(), Error>> + Send;
206
207    /// Called to get block/partition information for Block::GetInfo, Partition::GetTypeGuid, etc.
208    fn get_info(&self) -> impl Future<Output = Result<Cow<'_, DeviceInfo>, zx::Status>> + Send;
209
210    /// Called to handle the GetVolumeInfo FIDL call.
211    fn get_volume_info(
212        &self,
213    ) -> impl Future<Output = Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status>> + Send
214    {
215        async { Err(zx::Status::NOT_SUPPORTED) }
216    }
217
218    /// Called to handle the QuerySlices FIDL call.
219    fn query_slices(
220        &self,
221        _start_slices: &[u64],
222    ) -> impl Future<Output = Result<Vec<fvolume::VsliceRange>, zx::Status>> + Send {
223        async { Err(zx::Status::NOT_SUPPORTED) }
224    }
225
226    /// Called to handle the Shrink FIDL call.
227    fn extend(
228        &self,
229        _start_slice: u64,
230        _slice_count: u64,
231    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
232        async { Err(zx::Status::NOT_SUPPORTED) }
233    }
234
235    /// Called to handle the Shrink FIDL call.
236    fn shrink(
237        &self,
238        _start_slice: u64,
239        _slice_count: u64,
240    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
241        async { Err(zx::Status::NOT_SUPPORTED) }
242    }
243}
244
245pub trait IntoSessionManager {
246    type SM: SessionManager;
247
248    fn into_session_manager(self) -> Arc<Self::SM>;
249}
250
251impl<SM: SessionManager> BlockServer<SM> {
252    pub fn new(block_size: u32, session_manager: impl IntoSessionManager<SM = SM>) -> Self {
253        Self { block_size, session_manager: session_manager.into_session_manager() }
254    }
255
256    /// Called to process requests for fuchsia.hardware.block.volume/Volume.
257    pub async fn handle_requests(
258        &self,
259        mut requests: fvolume::VolumeRequestStream,
260    ) -> Result<(), Error> {
261        let scope = fasync::Scope::new();
262        while let Some(request) = requests.try_next().await.unwrap() {
263            if let Some(session) = self.handle_request(request).await? {
264                scope.spawn(session.map(|_| ()));
265            }
266        }
267        scope.await;
268        Ok(())
269    }
270
271    /// Processes a partition request.  If a new session task is created in response to the request,
272    /// it is returned.
273    async fn handle_request(
274        &self,
275        request: fvolume::VolumeRequest,
276    ) -> Result<Option<impl Future<Output = Result<(), Error>> + Send>, Error> {
277        match request {
278            fvolume::VolumeRequest::GetInfo { responder } => match self.device_info().await {
279                Ok(info) => {
280                    let max_transfer_size = info.max_transfer_size(self.block_size);
281                    let (block_count, flags) = match info.as_ref() {
282                        DeviceInfo::Block(BlockInfo { block_count, device_flags, .. }) => {
283                            (*block_count, *device_flags)
284                        }
285                        DeviceInfo::Partition(partition_info) => {
286                            let block_count = if let Some(range) =
287                                partition_info.block_range.as_ref()
288                            {
289                                range.end - range.start
290                            } else {
291                                let volume_info = self.session_manager.get_volume_info().await?;
292                                volume_info.0.slice_size * volume_info.1.partition_slice_count
293                                    / self.block_size as u64
294                            };
295                            (block_count, partition_info.device_flags)
296                        }
297                    };
298                    responder.send(Ok(&fblock::BlockInfo {
299                        block_count,
300                        block_size: self.block_size,
301                        max_transfer_size,
302                        flags,
303                    }))?;
304                }
305                Err(status) => responder.send(Err(status.into_raw()))?,
306            },
307            fvolume::VolumeRequest::GetStats { clear: _, responder } => {
308                // TODO(https://fxbug.dev/348077960): Implement this
309                responder.send(Err(zx::Status::NOT_SUPPORTED.into_raw()))?;
310            }
311            fvolume::VolumeRequest::OpenSession { session, control_handle: _ } => {
312                return Ok(Some(self.session_manager.clone().open_session(
313                    session.into_stream(),
314                    None,
315                    self.block_size,
316                )));
317            }
318            fvolume::VolumeRequest::OpenSessionWithOffsetMap {
319                session,
320                offset_map,
321                initial_mappings,
322                control_handle: _,
323            } => {
324                if offset_map.is_some() || initial_mappings.as_ref().is_none_or(|m| m.len() != 1) {
325                    // TODO(https://fxbug.dev/402515764): Support multiple mappings and
326                    // dynamic querying for FVM as needed.  A single static map is
327                    // sufficient for GPT.
328                    session.close_with_epitaph(zx::Status::NOT_SUPPORTED)?;
329                    return Ok(None);
330                }
331                let initial_mapping = match initial_mappings.unwrap().pop().unwrap().try_into() {
332                    Ok(m) => m,
333                    Err(status) => {
334                        session.close_with_epitaph(status)?;
335                        return Ok(None);
336                    }
337                };
338                let offset_map = OffsetMap::new(initial_mapping);
339                return Ok(Some(self.session_manager.clone().open_session(
340                    session.into_stream(),
341                    Some(offset_map),
342                    self.block_size,
343                )));
344            }
345            fvolume::VolumeRequest::GetTypeGuid { responder } => match self.device_info().await {
346                Ok(info) => {
347                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
348                        let mut guid =
349                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
350                        guid.value.copy_from_slice(&partition_info.type_guid);
351                        responder.send(zx::sys::ZX_OK, Some(&guid))?;
352                    } else {
353                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
354                    }
355                }
356                Err(status) => {
357                    responder.send(status.into_raw(), None)?;
358                }
359            },
360            fvolume::VolumeRequest::GetInstanceGuid { responder } => {
361                match self.device_info().await {
362                    Ok(info) => {
363                        if let DeviceInfo::Partition(partition_info) = info.as_ref() {
364                            let mut guid =
365                                fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
366                            guid.value.copy_from_slice(&partition_info.instance_guid);
367                            responder.send(zx::sys::ZX_OK, Some(&guid))?;
368                        } else {
369                            responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
370                        }
371                    }
372                    Err(status) => {
373                        responder.send(status.into_raw(), None)?;
374                    }
375                }
376            }
377            fvolume::VolumeRequest::GetName { responder } => match self.device_info().await {
378                Ok(info) => {
379                    if let DeviceInfo::Partition(partition_info) = info.as_ref() {
380                        responder.send(zx::sys::ZX_OK, Some(&partition_info.name))?;
381                    } else {
382                        responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED, None)?;
383                    }
384                }
385                Err(status) => {
386                    responder.send(status.into_raw(), None)?;
387                }
388            },
389            fvolume::VolumeRequest::GetMetadata { responder } => match self.device_info().await {
390                Ok(info) => {
391                    if let DeviceInfo::Partition(info) = info.as_ref() {
392                        let mut type_guid =
393                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
394                        type_guid.value.copy_from_slice(&info.type_guid);
395                        let mut instance_guid =
396                            fpartition::Guid { value: [0u8; fpartition::GUID_LENGTH as usize] };
397                        instance_guid.value.copy_from_slice(&info.instance_guid);
398                        responder.send(Ok(&fpartition::PartitionGetMetadataResponse {
399                            name: Some(info.name.clone()),
400                            type_guid: Some(type_guid),
401                            instance_guid: Some(instance_guid),
402                            start_block_offset: info.block_range.as_ref().map(|range| range.start),
403                            num_blocks: info
404                                .block_range
405                                .as_ref()
406                                .map(|range| range.end - range.start),
407                            flags: Some(info.flags),
408                            ..Default::default()
409                        }))?;
410                    }
411                }
412                Err(status) => responder.send(Err(status.into_raw()))?,
413            },
414            fvolume::VolumeRequest::QuerySlices { responder, start_slices } => {
415                match self.session_manager.query_slices(&start_slices).await {
416                    Ok(mut results) => {
417                        let results_len = results.len();
418                        assert!(results_len <= 16);
419                        results.resize(16, fvolume::VsliceRange { allocated: false, count: 0 });
420                        responder.send(
421                            zx::sys::ZX_OK,
422                            &results.try_into().unwrap(),
423                            results_len as u64,
424                        )?;
425                    }
426                    Err(s) => {
427                        responder.send(
428                            s.into_raw(),
429                            &[fvolume::VsliceRange { allocated: false, count: 0 }; 16],
430                            0,
431                        )?;
432                    }
433                }
434            }
435            fvolume::VolumeRequest::GetVolumeInfo { responder, .. } => {
436                match self.session_manager.get_volume_info().await {
437                    Ok((manager_info, volume_info)) => {
438                        responder.send(zx::sys::ZX_OK, Some(&manager_info), Some(&volume_info))?
439                    }
440                    Err(s) => responder.send(s.into_raw(), None, None)?,
441                }
442            }
443            fvolume::VolumeRequest::Extend { responder, start_slice, slice_count } => {
444                responder.send(
445                    zx::Status::from(self.session_manager.extend(start_slice, slice_count).await)
446                        .into_raw(),
447                )?;
448            }
449            fvolume::VolumeRequest::Shrink { responder, start_slice, slice_count } => {
450                responder.send(
451                    zx::Status::from(self.session_manager.shrink(start_slice, slice_count).await)
452                        .into_raw(),
453                )?;
454            }
455            fvolume::VolumeRequest::Destroy { responder, .. } => {
456                responder.send(zx::sys::ZX_ERR_NOT_SUPPORTED)?;
457            }
458        }
459        Ok(None)
460    }
461
462    async fn device_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
463        self.session_manager.get_info().await
464    }
465}
466
467struct SessionHelper<SM: SessionManager> {
468    session_manager: Arc<SM>,
469    offset_map: Option<OffsetMap>,
470    block_size: u32,
471    peer_fifo: zx::Fifo<BlockFifoResponse, BlockFifoRequest>,
472    vmos: Mutex<BTreeMap<u16, Arc<zx::Vmo>>>,
473    message_groups: FifoMessageGroups,
474}
475
476impl<SM: SessionManager> SessionHelper<SM> {
477    fn new(
478        session_manager: Arc<SM>,
479        offset_map: Option<OffsetMap>,
480        block_size: u32,
481    ) -> Result<(Self, zx::Fifo<BlockFifoRequest, BlockFifoResponse>), zx::Status> {
482        let (peer_fifo, fifo) = zx::Fifo::create(16)?;
483        Ok((
484            Self {
485                session_manager,
486                offset_map,
487                block_size,
488                peer_fifo,
489                vmos: Mutex::default(),
490                message_groups: FifoMessageGroups::default(),
491            },
492            fifo,
493        ))
494    }
495
496    async fn handle_request(&self, request: fblock::SessionRequest) -> Result<(), Error> {
497        match request {
498            fblock::SessionRequest::GetFifo { responder } => {
499                let rights = zx::Rights::TRANSFER
500                    | zx::Rights::READ
501                    | zx::Rights::WRITE
502                    | zx::Rights::SIGNAL
503                    | zx::Rights::WAIT;
504                match self.peer_fifo.duplicate_handle(rights) {
505                    Ok(fifo) => responder.send(Ok(fifo.downcast()))?,
506                    Err(s) => responder.send(Err(s.into_raw()))?,
507                }
508                Ok(())
509            }
510            fblock::SessionRequest::AttachVmo { vmo, responder } => {
511                let vmo = Arc::new(vmo);
512                let vmo_id = {
513                    let mut vmos = self.vmos.lock();
514                    if vmos.len() == u16::MAX as usize {
515                        responder.send(Err(zx::Status::NO_RESOURCES.into_raw()))?;
516                        return Ok(());
517                    } else {
518                        let vmo_id = match vmos.last_entry() {
519                            None => 1,
520                            Some(o) => {
521                                o.key().checked_add(1).unwrap_or_else(|| {
522                                    let mut vmo_id = 1;
523                                    // Find the first gap...
524                                    for (&id, _) in &*vmos {
525                                        if id > vmo_id {
526                                            break;
527                                        }
528                                        vmo_id = id + 1;
529                                    }
530                                    vmo_id
531                                })
532                            }
533                        };
534                        vmos.insert(vmo_id, vmo.clone());
535                        vmo_id
536                    }
537                };
538                self.session_manager.clone().on_attach_vmo(&vmo).await?;
539                responder.send(Ok(&fblock::VmoId { id: vmo_id }))?;
540                Ok(())
541            }
542            fblock::SessionRequest::Close { responder } => {
543                responder.send(Ok(()))?;
544                Err(anyhow!("Closed"))
545            }
546        }
547    }
548
549    fn finish_fifo_request(
550        &self,
551        request: RequestTracking,
552        status: zx::Status,
553    ) -> Option<BlockFifoResponse> {
554        match request.group_or_request {
555            GroupOrRequest::Group(group_id) => {
556                let response = self.message_groups.complete(group_id, status);
557                fuchsia_trace::duration!(
558                    c"storage",
559                    c"block_server::finish_transaction_in_group",
560                    "group" => u32::from(group_id),
561                    "group_completed" => response.is_some(),
562                    "status" => status.into_raw());
563                if let Some(trace_flow_id) = request.trace_flow_id {
564                    fuchsia_trace::flow_step!(
565                        c"storage",
566                        c"block_server::finish_transaction",
567                        trace_flow_id.get().into()
568                    );
569                }
570                response
571            }
572            GroupOrRequest::Request(reqid) => {
573                fuchsia_trace::duration!(
574                    c"storage", c"block_server::finish_transaction", "status" => status.into_raw());
575                if let Some(trace_flow_id) = request.trace_flow_id {
576                    fuchsia_trace::flow_step!(
577                        c"storage",
578                        c"block_server::finish_transaction",
579                        trace_flow_id.get().into()
580                    );
581                }
582                Some(BlockFifoResponse { status: status.into_raw(), reqid, ..Default::default() })
583            }
584        }
585    }
586
587    fn decode_fifo_request(&self, request: &BlockFifoRequest) -> Option<DecodedRequest> {
588        let flags = BlockIoFlag::from_bits_truncate(request.command.flags);
589        let is_group = flags.contains(BlockIoFlag::GROUP_ITEM);
590        let last_in_group = flags.contains(BlockIoFlag::GROUP_LAST);
591        let mut op_code =
592            BlockOpcode::from_primitive(request.command.opcode).ok_or(zx::Status::INVALID_ARGS);
593
594        let group_or_request = if is_group {
595            let mut groups = self.message_groups.0.lock();
596            let group = groups.entry(request.group).or_insert_with(|| FifoMessageGroup::new());
597            if group.req_id.is_some() {
598                // We have already received a request tagged as last.
599                if group.status == zx::Status::OK {
600                    group.status = zx::Status::INVALID_ARGS;
601                }
602                return None;
603            }
604            if last_in_group {
605                group.req_id = Some(request.reqid);
606                // If the group has had an error, there is no point trying to issue this request.
607                if group.status != zx::Status::OK {
608                    op_code = Err(group.status);
609                }
610            } else if group.status != zx::Status::OK {
611                // The group has already encountered an error, so there is no point trying to issue
612                // this request.
613                return None;
614            }
615            group.count += 1;
616            GroupOrRequest::Group(request.group)
617        } else {
618            GroupOrRequest::Request(request.reqid)
619        };
620
621        let mut vmo_offset = 0;
622        let vmo = match op_code {
623            Ok(BlockOpcode::Read) | Ok(BlockOpcode::Write) => (|| {
624                if request.length == 0 {
625                    return Err(zx::Status::INVALID_ARGS);
626                }
627                vmo_offset = request
628                    .vmo_offset
629                    .checked_mul(self.block_size as u64)
630                    .ok_or(zx::Status::OUT_OF_RANGE)?;
631                self.vmos
632                    .lock()
633                    .get(&request.vmoid)
634                    .cloned()
635                    .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo)))
636            })(),
637            Ok(BlockOpcode::CloseVmo) => self
638                .vmos
639                .lock()
640                .remove(&request.vmoid)
641                .map_or(Err(zx::Status::IO), |vmo| Ok(Some(vmo))),
642            _ => Ok(None),
643        }
644        .unwrap_or_else(|e| {
645            op_code = Err(e);
646            None
647        });
648
649        let operation = op_code.map(|code| match code {
650            BlockOpcode::Read => Operation::Read {
651                device_block_offset: request.dev_offset,
652                block_count: request.length,
653                _unused: 0,
654                vmo_offset,
655            },
656            BlockOpcode::Write => Operation::Write {
657                device_block_offset: request.dev_offset,
658                block_count: request.length,
659                options: if flags.contains(BlockIoFlag::FORCE_ACCESS) {
660                    WriteOptions::FORCE_ACCESS
661                } else {
662                    WriteOptions::empty()
663                },
664                vmo_offset,
665            },
666            BlockOpcode::Flush => Operation::Flush,
667            BlockOpcode::Trim => Operation::Trim {
668                device_block_offset: request.dev_offset,
669                block_count: request.length,
670            },
671            BlockOpcode::CloseVmo => Operation::CloseVmo,
672        });
673        if let Ok(operation) = operation.as_ref() {
674            use fuchsia_trace::ArgValue;
675            static CACHE: AtomicU64 = AtomicU64::new(0);
676            if let Some(context) =
677                fuchsia_trace::TraceCategoryContext::acquire_cached(c"storage", &CACHE)
678            {
679                let trace_args_with_group = [
680                    ArgValue::of("group", u32::from(group_or_request.group_id())),
681                    ArgValue::of("opcode", operation.trace_label()),
682                ];
683                let trace_args = [ArgValue::of("opcode", operation.trace_label())];
684                let _scope = if group_or_request.is_group() {
685                    fuchsia_trace::duration(
686                        c"storage",
687                        c"block_server::start_transaction",
688                        &trace_args_with_group,
689                    )
690                } else {
691                    fuchsia_trace::duration(
692                        c"storage",
693                        c"block_server::start_transaction",
694                        &trace_args,
695                    )
696                };
697                let trace_flow_id = NonZero::new(request.trace_flow_id);
698                if let Some(trace_flow_id) = trace_flow_id.clone() {
699                    fuchsia_trace::flow_step(
700                        &context,
701                        c"block_server::start_trnsaction",
702                        trace_flow_id.get().into(),
703                        &[],
704                    );
705                }
706            }
707        }
708        Some(DecodedRequest::new(
709            RequestTracking {
710                group_or_request,
711                trace_flow_id: NonZero::new(request.trace_flow_id),
712            },
713            operation,
714            vmo,
715            self.offset_map.as_ref(),
716        ))
717    }
718
719    fn take_vmos(&self) -> BTreeMap<u16, Arc<zx::Vmo>> {
720        std::mem::take(&mut *self.vmos.lock())
721    }
722}
723
724#[derive(Debug)]
725struct DecodedRequest {
726    request_tracking: RequestTracking,
727    operation: Result<Operation, zx::Status>,
728    vmo: Option<Arc<zx::Vmo>>,
729}
730
731impl DecodedRequest {
732    fn new(
733        request_tracking: RequestTracking,
734        operation: Result<Operation, zx::Status>,
735        vmo: Option<Arc<zx::Vmo>>,
736        offset_map: Option<&OffsetMap>,
737    ) -> Self {
738        let operation =
739            operation.and_then(|operation| operation.validate_and_adjust_range(offset_map));
740        Self { request_tracking, operation, vmo }
741    }
742}
743
744/// cbindgen:no-export
745pub type WriteOptions = block_protocol::WriteOptions;
746
747#[repr(C)]
748#[derive(Debug)]
749pub enum Operation {
750    // NOTE: On the C++ side, this ends up as a union and, for efficiency reasons, there is code
751    // that assumes that some fields for reads and writes (and possibly trim) line-up (e.g. common
752    // code can read `device_block_offset` from the read variant and then assume it's valid for the
753    // write variant).
754    Read {
755        device_block_offset: u64,
756        block_count: u32,
757        _unused: u32,
758        vmo_offset: u64,
759    },
760    Write {
761        device_block_offset: u64,
762        block_count: u32,
763        options: WriteOptions,
764        vmo_offset: u64,
765    },
766    Flush,
767    Trim {
768        device_block_offset: u64,
769        block_count: u32,
770    },
771    /// This will never be seen by the C interface.
772    CloseVmo,
773}
774
775impl Operation {
776    // Adjusts the operation's block range via `OffsetMap`.  If the request would exceed the range
777    // known to the map, or is otherwise invalid, returns an error.
778    fn validate_and_adjust_range(
779        mut self,
780        offset_map: Option<&OffsetMap>,
781    ) -> Result<Operation, zx::Status> {
782        let adjust_offset = |dev_offset, length| {
783            // For compatibility with the C++ server, always ensure the length is nonzero
784            if length == 0 {
785                return Err(zx::Status::INVALID_ARGS);
786            }
787            if let Some(offset_map) = offset_map {
788                offset_map.adjust_request(dev_offset, length as u64).ok_or(zx::Status::OUT_OF_RANGE)
789            } else {
790                Ok(dev_offset)
791            }
792        };
793        match &mut self {
794            Operation::Read { device_block_offset, block_count, .. } => {
795                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
796            }
797            Operation::Write { device_block_offset, block_count, .. } => {
798                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
799            }
800            Operation::Trim { device_block_offset, block_count, .. } => {
801                *device_block_offset = adjust_offset(*device_block_offset, *block_count)?;
802            }
803            _ => {}
804        }
805        Ok(self)
806    }
807    fn trace_label(&self) -> &'static str {
808        match self {
809            Operation::Read { .. } => "read",
810            Operation::Write { .. } => "write",
811            Operation::Flush { .. } => "flush",
812            Operation::Trim { .. } => "trim",
813            Operation::CloseVmo { .. } => "close_vmo",
814        }
815    }
816}
817
818#[derive(Clone, Copy, Debug)]
819pub struct RequestTracking {
820    group_or_request: GroupOrRequest,
821    trace_flow_id: Option<NonZero<u64>>,
822}
823
824#[derive(Clone, Copy, Debug)]
825pub enum GroupOrRequest {
826    Group(u16),
827    Request(u32),
828}
829
830impl GroupOrRequest {
831    fn is_group(&self) -> bool {
832        if let Self::Group(_) = self {
833            true
834        } else {
835            false
836        }
837    }
838
839    fn group_id(&self) -> u16 {
840        match self {
841            Self::Group(id) => *id,
842            Self::Request(_) => 0,
843        }
844    }
845}
846
847/// cbindgen:ignore
848const IS_GROUP: u64 = 0x8000_0000_0000_0000;
849/// cbindgen:ignore
850const USED_VMO: u64 = 0x4000_0000_0000_0000;
851#[repr(transparent)]
852#[derive(Clone, Copy, Eq, PartialEq, Hash)]
853pub struct RequestId(u64);
854
855impl RequestId {
856    /// Marks the request as having used a VMO, so that we keep the VMO alive until
857    /// the request has finished.
858    fn with_vmo(self) -> Self {
859        Self(self.0 | USED_VMO)
860    }
861
862    /// Returns whether the request ID indicates a VMO was used.
863    fn did_have_vmo(&self) -> bool {
864        self.0 & USED_VMO != 0
865    }
866}
867
868impl From<GroupOrRequest> for RequestId {
869    fn from(value: GroupOrRequest) -> Self {
870        match value {
871            GroupOrRequest::Group(group) => RequestId(group as u64 | IS_GROUP),
872            GroupOrRequest::Request(request) => RequestId(request as u64),
873        }
874    }
875}
876
877impl From<RequestId> for GroupOrRequest {
878    fn from(value: RequestId) -> Self {
879        if value.0 & IS_GROUP == 0 {
880            GroupOrRequest::Request(value.0 as u32)
881        } else {
882            GroupOrRequest::Group(value.0 as u16)
883        }
884    }
885}
886
887#[cfg(test)]
888mod tests {
889    use super::{BlockServer, DeviceInfo, PartitionInfo};
890    use crate::async_interface::FIFO_MAX_REQUESTS;
891    use assert_matches::assert_matches;
892    use block_protocol::{BlockFifoCommand, BlockFifoRequest, BlockFifoResponse, WriteOptions};
893    use fidl_fuchsia_hardware_block_driver::{BlockIoFlag, BlockOpcode};
894    use fuchsia_sync::Mutex;
895    use futures::channel::oneshot;
896    use futures::future::BoxFuture;
897    use futures::FutureExt as _;
898    use std::borrow::Cow;
899    use std::future::poll_fn;
900    use std::num::NonZero;
901    use std::pin::pin;
902    use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
903    use std::sync::Arc;
904    use std::task::{Context, Poll};
905    use zx::{AsHandleRef as _, HandleBased as _};
906    use {
907        fidl_fuchsia_hardware_block as fblock, fidl_fuchsia_hardware_block_volume as fvolume,
908        fuchsia_async as fasync,
909    };
910
911    #[derive(Default)]
912    struct MockInterface {
913        read_hook: Option<
914            Box<
915                dyn Fn(u64, u32, &Arc<zx::Vmo>, u64) -> BoxFuture<'static, Result<(), zx::Status>>
916                    + Send
917                    + Sync,
918            >,
919        >,
920    }
921
922    impl super::async_interface::Interface for MockInterface {
923        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
924            Ok(())
925        }
926
927        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
928            Ok(Cow::Owned(test_device_info()))
929        }
930
931        async fn read(
932            &self,
933            device_block_offset: u64,
934            block_count: u32,
935            vmo: &Arc<zx::Vmo>,
936            vmo_offset: u64,
937            _trace_flow_id: Option<NonZero<u64>>,
938        ) -> Result<(), zx::Status> {
939            if let Some(read_hook) = &self.read_hook {
940                read_hook(device_block_offset, block_count, vmo, vmo_offset).await
941            } else {
942                unimplemented!();
943            }
944        }
945
946        async fn write(
947            &self,
948            _device_block_offset: u64,
949            _block_count: u32,
950            _vmo: &Arc<zx::Vmo>,
951            _vmo_offset: u64,
952            _opts: WriteOptions,
953            _trace_flow_id: Option<NonZero<u64>>,
954        ) -> Result<(), zx::Status> {
955            unreachable!();
956        }
957
958        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
959            unreachable!();
960        }
961
962        async fn trim(
963            &self,
964            _device_block_offset: u64,
965            _block_count: u32,
966            _trace_flow_id: Option<NonZero<u64>>,
967        ) -> Result<(), zx::Status> {
968            unreachable!();
969        }
970
971        async fn get_volume_info(
972            &self,
973        ) -> Result<(fvolume::VolumeManagerInfo, fvolume::VolumeInfo), zx::Status> {
974            // Hang forever for the test_requests_dont_block_sessions test.
975            let () = std::future::pending().await;
976            unreachable!();
977        }
978    }
979
980    const BLOCK_SIZE: u32 = 512;
981
982    fn test_device_info() -> DeviceInfo {
983        DeviceInfo::Partition(PartitionInfo {
984            device_flags: fblock::Flag::READONLY,
985            max_transfer_blocks: None,
986            block_range: Some(12..34),
987            type_guid: [1; 16],
988            instance_guid: [2; 16],
989            name: "foo".to_string(),
990            flags: 0xabcd,
991        })
992    }
993
994    #[fuchsia::test]
995    async fn test_info() {
996        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
997
998        futures::join!(
999            async {
1000                let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1001                block_server.handle_requests(stream).await.unwrap();
1002            },
1003            async {
1004                let expected_info = test_device_info();
1005                let partition_info = if let DeviceInfo::Partition(info) = &expected_info {
1006                    info
1007                } else {
1008                    unreachable!()
1009                };
1010
1011                let block_info = proxy.get_info().await.unwrap().unwrap();
1012                assert_eq!(
1013                    block_info.block_count,
1014                    partition_info.block_range.as_ref().unwrap().end
1015                        - partition_info.block_range.as_ref().unwrap().start
1016                );
1017                assert_eq!(block_info.flags, fblock::Flag::READONLY);
1018
1019                // TODO(https://fxbug.dev/348077960): Check max_transfer_size
1020
1021                let (status, type_guid) = proxy.get_type_guid().await.unwrap();
1022                assert_eq!(status, zx::sys::ZX_OK);
1023                assert_eq!(&type_guid.as_ref().unwrap().value, &partition_info.type_guid);
1024
1025                let (status, instance_guid) = proxy.get_instance_guid().await.unwrap();
1026                assert_eq!(status, zx::sys::ZX_OK);
1027                assert_eq!(&instance_guid.as_ref().unwrap().value, &partition_info.instance_guid);
1028
1029                let (status, name) = proxy.get_name().await.unwrap();
1030                assert_eq!(status, zx::sys::ZX_OK);
1031                assert_eq!(name.as_ref(), Some(&partition_info.name));
1032
1033                let metadata = proxy.get_metadata().await.unwrap().expect("get_flags failed");
1034                assert_eq!(metadata.name, name);
1035                assert_eq!(metadata.type_guid.as_ref(), type_guid.as_deref());
1036                assert_eq!(metadata.instance_guid.as_ref(), instance_guid.as_deref());
1037                assert_eq!(
1038                    metadata.start_block_offset,
1039                    Some(partition_info.block_range.as_ref().unwrap().start)
1040                );
1041                assert_eq!(metadata.num_blocks, Some(block_info.block_count));
1042                assert_eq!(metadata.flags, Some(partition_info.flags));
1043
1044                std::mem::drop(proxy);
1045            }
1046        );
1047    }
1048
1049    #[fuchsia::test]
1050    async fn test_attach_vmo() {
1051        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1052
1053        let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1054        let koid = vmo.get_koid().unwrap();
1055
1056        futures::join!(
1057            async {
1058                let block_server = BlockServer::new(
1059                    BLOCK_SIZE,
1060                    Arc::new(MockInterface {
1061                        read_hook: Some(Box::new(move |_, _, vmo, _| {
1062                            assert_eq!(vmo.get_koid().unwrap(), koid);
1063                            Box::pin(async { Ok(()) })
1064                        })),
1065                        ..MockInterface::default()
1066                    }),
1067                );
1068                block_server.handle_requests(stream).await.unwrap();
1069            },
1070            async move {
1071                let (session_proxy, server) = fidl::endpoints::create_proxy();
1072
1073                proxy.open_session(server).unwrap();
1074
1075                let vmo_id = session_proxy
1076                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1077                    .await
1078                    .unwrap()
1079                    .unwrap();
1080                assert_ne!(vmo_id.id, 0);
1081
1082                let mut fifo =
1083                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1084                let (mut reader, mut writer) = fifo.async_io();
1085
1086                // Keep attaching VMOs until we eventually hit the maximum.
1087                let mut count = 1;
1088                loop {
1089                    match session_proxy
1090                        .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1091                        .await
1092                        .unwrap()
1093                    {
1094                        Ok(vmo_id) => assert_ne!(vmo_id.id, 0),
1095                        Err(e) => {
1096                            assert_eq!(e, zx::sys::ZX_ERR_NO_RESOURCES);
1097                            break;
1098                        }
1099                    }
1100
1101                    // Only test every 10 to keep test time down.
1102                    if count % 10 == 0 {
1103                        writer
1104                            .write_entries(&BlockFifoRequest {
1105                                command: BlockFifoCommand {
1106                                    opcode: BlockOpcode::Read.into_primitive(),
1107                                    ..Default::default()
1108                                },
1109                                vmoid: vmo_id.id,
1110                                length: 1,
1111                                ..Default::default()
1112                            })
1113                            .await
1114                            .unwrap();
1115
1116                        let mut response = BlockFifoResponse::default();
1117                        reader.read_entries(&mut response).await.unwrap();
1118                        assert_eq!(response.status, zx::sys::ZX_OK);
1119                    }
1120
1121                    count += 1;
1122                }
1123
1124                assert_eq!(count, u16::MAX as u64);
1125
1126                // Detach the original VMO, and make sure we can then attach another one.
1127                writer
1128                    .write_entries(&BlockFifoRequest {
1129                        command: BlockFifoCommand {
1130                            opcode: BlockOpcode::CloseVmo.into_primitive(),
1131                            ..Default::default()
1132                        },
1133                        vmoid: vmo_id.id,
1134                        ..Default::default()
1135                    })
1136                    .await
1137                    .unwrap();
1138
1139                let mut response = BlockFifoResponse::default();
1140                reader.read_entries(&mut response).await.unwrap();
1141                assert_eq!(response.status, zx::sys::ZX_OK);
1142
1143                let new_vmo_id = session_proxy
1144                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1145                    .await
1146                    .unwrap()
1147                    .unwrap();
1148                // It should reuse the same ID.
1149                assert_eq!(new_vmo_id.id, vmo_id.id);
1150
1151                std::mem::drop(proxy);
1152            }
1153        );
1154    }
1155
1156    #[fuchsia::test]
1157    async fn test_close() {
1158        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1159
1160        let mut server = std::pin::pin!(async {
1161            let block_server = BlockServer::new(BLOCK_SIZE, Arc::new(MockInterface::default()));
1162            block_server.handle_requests(stream).await.unwrap();
1163        }
1164        .fuse());
1165
1166        let mut client = std::pin::pin!(async {
1167            let (session_proxy, server) = fidl::endpoints::create_proxy();
1168
1169            proxy.open_session(server).unwrap();
1170
1171            // Dropping the proxy should not cause the session to terminate because the session is
1172            // still live.
1173            std::mem::drop(proxy);
1174
1175            session_proxy.close().await.unwrap().unwrap();
1176
1177            // Keep the session alive.  Calling `close` should cause the server to terminate.
1178            let _: () = std::future::pending().await;
1179        }
1180        .fuse());
1181
1182        futures::select!(
1183            _ = server => {}
1184            _ = client => unreachable!(),
1185        );
1186    }
1187
1188    #[derive(Default)]
1189    struct IoMockInterface {
1190        do_checks: bool,
1191        expected_op: Arc<Mutex<Option<ExpectedOp>>>,
1192        return_errors: bool,
1193    }
1194
1195    #[derive(Debug)]
1196    enum ExpectedOp {
1197        Read(u64, u32, u64),
1198        Write(u64, u32, u64),
1199        Trim(u64, u32),
1200        Flush,
1201    }
1202
1203    impl super::async_interface::Interface for IoMockInterface {
1204        async fn on_attach_vmo(&self, _vmo: &zx::Vmo) -> Result<(), zx::Status> {
1205            Ok(())
1206        }
1207
1208        async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1209            Ok(Cow::Owned(test_device_info()))
1210        }
1211
1212        async fn read(
1213            &self,
1214            device_block_offset: u64,
1215            block_count: u32,
1216            _vmo: &Arc<zx::Vmo>,
1217            vmo_offset: u64,
1218            _trace_flow_id: Option<NonZero<u64>>,
1219        ) -> Result<(), zx::Status> {
1220            if self.return_errors {
1221                Err(zx::Status::INTERNAL)
1222            } else {
1223                if self.do_checks {
1224                    assert_matches!(
1225                        self.expected_op.lock().take(),
1226                        Some(ExpectedOp::Read(a, b, c)) if device_block_offset == a &&
1227                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1228                        "Read {device_block_offset} {block_count} {vmo_offset}"
1229                    );
1230                }
1231                Ok(())
1232            }
1233        }
1234
1235        async fn write(
1236            &self,
1237            device_block_offset: u64,
1238            block_count: u32,
1239            _vmo: &Arc<zx::Vmo>,
1240            vmo_offset: u64,
1241            _opts: WriteOptions,
1242            _trace_flow_id: Option<NonZero<u64>>,
1243        ) -> Result<(), zx::Status> {
1244            if self.return_errors {
1245                Err(zx::Status::NOT_SUPPORTED)
1246            } else {
1247                if self.do_checks {
1248                    assert_matches!(
1249                        self.expected_op.lock().take(),
1250                        Some(ExpectedOp::Write(a, b, c)) if device_block_offset == a &&
1251                            block_count == b && vmo_offset / BLOCK_SIZE as u64 == c,
1252                        "Write {device_block_offset} {block_count} {vmo_offset}"
1253                    );
1254                }
1255                Ok(())
1256            }
1257        }
1258
1259        async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1260            if self.return_errors {
1261                Err(zx::Status::NO_RESOURCES)
1262            } else {
1263                if self.do_checks {
1264                    assert_matches!(self.expected_op.lock().take(), Some(ExpectedOp::Flush));
1265                }
1266                Ok(())
1267            }
1268        }
1269
1270        async fn trim(
1271            &self,
1272            device_block_offset: u64,
1273            block_count: u32,
1274            _trace_flow_id: Option<NonZero<u64>>,
1275        ) -> Result<(), zx::Status> {
1276            if self.return_errors {
1277                Err(zx::Status::NO_MEMORY)
1278            } else {
1279                if self.do_checks {
1280                    assert_matches!(
1281                        self.expected_op.lock().take(),
1282                        Some(ExpectedOp::Trim(a, b)) if device_block_offset == a &&
1283                            block_count == b,
1284                        "Trim {device_block_offset} {block_count}"
1285                    );
1286                }
1287                Ok(())
1288            }
1289        }
1290    }
1291
1292    #[fuchsia::test]
1293    async fn test_io() {
1294        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1295
1296        let expected_op = Arc::new(Mutex::new(None));
1297        let expected_op_clone = expected_op.clone();
1298
1299        let server = async {
1300            let block_server = BlockServer::new(
1301                BLOCK_SIZE,
1302                Arc::new(IoMockInterface {
1303                    return_errors: false,
1304                    do_checks: true,
1305                    expected_op: expected_op_clone,
1306                }),
1307            );
1308            block_server.handle_requests(stream).await.unwrap();
1309        };
1310
1311        let client = async move {
1312            let (session_proxy, server) = fidl::endpoints::create_proxy();
1313
1314            proxy.open_session(server).unwrap();
1315
1316            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1317            let vmo_id = session_proxy
1318                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1319                .await
1320                .unwrap()
1321                .unwrap();
1322
1323            let mut fifo =
1324                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1325            let (mut reader, mut writer) = fifo.async_io();
1326
1327            // READ
1328            *expected_op.lock() = Some(ExpectedOp::Read(1, 2, 3));
1329            writer
1330                .write_entries(&BlockFifoRequest {
1331                    command: BlockFifoCommand {
1332                        opcode: BlockOpcode::Read.into_primitive(),
1333                        ..Default::default()
1334                    },
1335                    vmoid: vmo_id.id,
1336                    dev_offset: 1,
1337                    length: 2,
1338                    vmo_offset: 3,
1339                    ..Default::default()
1340                })
1341                .await
1342                .unwrap();
1343
1344            let mut response = BlockFifoResponse::default();
1345            reader.read_entries(&mut response).await.unwrap();
1346            assert_eq!(response.status, zx::sys::ZX_OK);
1347
1348            // WRITE
1349            *expected_op.lock() = Some(ExpectedOp::Write(4, 5, 6));
1350            writer
1351                .write_entries(&BlockFifoRequest {
1352                    command: BlockFifoCommand {
1353                        opcode: BlockOpcode::Write.into_primitive(),
1354                        ..Default::default()
1355                    },
1356                    vmoid: vmo_id.id,
1357                    dev_offset: 4,
1358                    length: 5,
1359                    vmo_offset: 6,
1360                    ..Default::default()
1361                })
1362                .await
1363                .unwrap();
1364
1365            let mut response = BlockFifoResponse::default();
1366            reader.read_entries(&mut response).await.unwrap();
1367            assert_eq!(response.status, zx::sys::ZX_OK);
1368
1369            // FLUSH
1370            *expected_op.lock() = Some(ExpectedOp::Flush);
1371            writer
1372                .write_entries(&BlockFifoRequest {
1373                    command: BlockFifoCommand {
1374                        opcode: BlockOpcode::Flush.into_primitive(),
1375                        ..Default::default()
1376                    },
1377                    ..Default::default()
1378                })
1379                .await
1380                .unwrap();
1381
1382            reader.read_entries(&mut response).await.unwrap();
1383            assert_eq!(response.status, zx::sys::ZX_OK);
1384
1385            // TRIM
1386            *expected_op.lock() = Some(ExpectedOp::Trim(7, 8));
1387            writer
1388                .write_entries(&BlockFifoRequest {
1389                    command: BlockFifoCommand {
1390                        opcode: BlockOpcode::Trim.into_primitive(),
1391                        ..Default::default()
1392                    },
1393                    dev_offset: 7,
1394                    length: 8,
1395                    ..Default::default()
1396                })
1397                .await
1398                .unwrap();
1399
1400            reader.read_entries(&mut response).await.unwrap();
1401            assert_eq!(response.status, zx::sys::ZX_OK);
1402
1403            std::mem::drop(proxy);
1404        };
1405
1406        futures::join!(server, client);
1407    }
1408
1409    #[fuchsia::test]
1410    async fn test_io_errors() {
1411        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1412
1413        futures::join!(
1414            async {
1415                let block_server = BlockServer::new(
1416                    BLOCK_SIZE,
1417                    Arc::new(IoMockInterface {
1418                        return_errors: true,
1419                        do_checks: false,
1420                        expected_op: Arc::new(Mutex::new(None)),
1421                    }),
1422                );
1423                block_server.handle_requests(stream).await.unwrap();
1424            },
1425            async move {
1426                let (session_proxy, server) = fidl::endpoints::create_proxy();
1427
1428                proxy.open_session(server).unwrap();
1429
1430                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1431                let vmo_id = session_proxy
1432                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1433                    .await
1434                    .unwrap()
1435                    .unwrap();
1436
1437                let mut fifo =
1438                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1439                let (mut reader, mut writer) = fifo.async_io();
1440
1441                // READ
1442                writer
1443                    .write_entries(&BlockFifoRequest {
1444                        command: BlockFifoCommand {
1445                            opcode: BlockOpcode::Read.into_primitive(),
1446                            ..Default::default()
1447                        },
1448                        vmoid: vmo_id.id,
1449                        length: 1,
1450                        reqid: 1,
1451                        ..Default::default()
1452                    })
1453                    .await
1454                    .unwrap();
1455
1456                let mut response = BlockFifoResponse::default();
1457                reader.read_entries(&mut response).await.unwrap();
1458                assert_eq!(response.status, zx::sys::ZX_ERR_INTERNAL);
1459
1460                // WRITE
1461                writer
1462                    .write_entries(&BlockFifoRequest {
1463                        command: BlockFifoCommand {
1464                            opcode: BlockOpcode::Write.into_primitive(),
1465                            ..Default::default()
1466                        },
1467                        vmoid: vmo_id.id,
1468                        length: 1,
1469                        reqid: 2,
1470                        ..Default::default()
1471                    })
1472                    .await
1473                    .unwrap();
1474
1475                reader.read_entries(&mut response).await.unwrap();
1476                assert_eq!(response.status, zx::sys::ZX_ERR_NOT_SUPPORTED);
1477
1478                // FLUSH
1479                writer
1480                    .write_entries(&BlockFifoRequest {
1481                        command: BlockFifoCommand {
1482                            opcode: BlockOpcode::Flush.into_primitive(),
1483                            ..Default::default()
1484                        },
1485                        reqid: 3,
1486                        ..Default::default()
1487                    })
1488                    .await
1489                    .unwrap();
1490
1491                reader.read_entries(&mut response).await.unwrap();
1492                assert_eq!(response.status, zx::sys::ZX_ERR_NO_RESOURCES);
1493
1494                // TRIM
1495                writer
1496                    .write_entries(&BlockFifoRequest {
1497                        command: BlockFifoCommand {
1498                            opcode: BlockOpcode::Trim.into_primitive(),
1499                            ..Default::default()
1500                        },
1501                        reqid: 4,
1502                        length: 1,
1503                        ..Default::default()
1504                    })
1505                    .await
1506                    .unwrap();
1507
1508                reader.read_entries(&mut response).await.unwrap();
1509                assert_eq!(response.status, zx::sys::ZX_ERR_NO_MEMORY);
1510
1511                std::mem::drop(proxy);
1512            }
1513        );
1514    }
1515
1516    #[fuchsia::test]
1517    async fn test_invalid_args() {
1518        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1519
1520        futures::join!(
1521            async {
1522                let block_server = BlockServer::new(
1523                    BLOCK_SIZE,
1524                    Arc::new(IoMockInterface {
1525                        return_errors: false,
1526                        do_checks: false,
1527                        expected_op: Arc::new(Mutex::new(None)),
1528                    }),
1529                );
1530                block_server.handle_requests(stream).await.unwrap();
1531            },
1532            async move {
1533                let (session_proxy, server) = fidl::endpoints::create_proxy();
1534
1535                proxy.open_session(server).unwrap();
1536
1537                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1538                let vmo_id = session_proxy
1539                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1540                    .await
1541                    .unwrap()
1542                    .unwrap();
1543
1544                let mut fifo =
1545                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1546
1547                async fn test(
1548                    fifo: &mut fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
1549                    request: BlockFifoRequest,
1550                ) -> Result<(), zx::Status> {
1551                    let (mut reader, mut writer) = fifo.async_io();
1552                    writer.write_entries(&request).await.unwrap();
1553                    let mut response = BlockFifoResponse::default();
1554                    reader.read_entries(&mut response).await.unwrap();
1555                    zx::Status::ok(response.status)
1556                }
1557
1558                // READ
1559
1560                let good_read_request = || BlockFifoRequest {
1561                    command: BlockFifoCommand {
1562                        opcode: BlockOpcode::Read.into_primitive(),
1563                        ..Default::default()
1564                    },
1565                    vmoid: vmo_id.id,
1566                    ..Default::default()
1567                };
1568
1569                assert_eq!(
1570                    test(
1571                        &mut fifo,
1572                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_read_request() }
1573                    )
1574                    .await,
1575                    Err(zx::Status::INVALID_ARGS)
1576                );
1577
1578                assert_eq!(
1579                    test(
1580                        &mut fifo,
1581                        BlockFifoRequest {
1582                            vmo_offset: 0xffff_ffff_ffff_ffff,
1583                            ..good_read_request()
1584                        }
1585                    )
1586                    .await,
1587                    Err(zx::Status::INVALID_ARGS)
1588                );
1589
1590                // WRITE
1591
1592                let good_write_request = || BlockFifoRequest {
1593                    command: BlockFifoCommand {
1594                        opcode: BlockOpcode::Write.into_primitive(),
1595                        ..Default::default()
1596                    },
1597                    vmoid: vmo_id.id,
1598                    ..Default::default()
1599                };
1600
1601                assert_eq!(
1602                    test(
1603                        &mut fifo,
1604                        BlockFifoRequest { vmoid: vmo_id.id + 1, ..good_write_request() }
1605                    )
1606                    .await,
1607                    Err(zx::Status::INVALID_ARGS)
1608                );
1609
1610                assert_eq!(
1611                    test(
1612                        &mut fifo,
1613                        BlockFifoRequest {
1614                            vmo_offset: 0xffff_ffff_ffff_ffff,
1615                            ..good_write_request()
1616                        }
1617                    )
1618                    .await,
1619                    Err(zx::Status::INVALID_ARGS)
1620                );
1621
1622                // CLOSE VMO
1623
1624                assert_eq!(
1625                    test(
1626                        &mut fifo,
1627                        BlockFifoRequest {
1628                            command: BlockFifoCommand {
1629                                opcode: BlockOpcode::CloseVmo.into_primitive(),
1630                                ..Default::default()
1631                            },
1632                            vmoid: vmo_id.id + 1,
1633                            ..Default::default()
1634                        }
1635                    )
1636                    .await,
1637                    Err(zx::Status::IO)
1638                );
1639
1640                std::mem::drop(proxy);
1641            }
1642        );
1643    }
1644
1645    #[fuchsia::test]
1646    async fn test_concurrent_requests() {
1647        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1648
1649        let waiting_readers = Arc::new(Mutex::new(Vec::new()));
1650        let waiting_readers_clone = waiting_readers.clone();
1651
1652        futures::join!(
1653            async move {
1654                let block_server = BlockServer::new(
1655                    BLOCK_SIZE,
1656                    Arc::new(MockInterface {
1657                        read_hook: Some(Box::new(move |dev_block_offset, _, _, _| {
1658                            let (tx, rx) = oneshot::channel();
1659                            waiting_readers_clone.lock().push((dev_block_offset as u32, tx));
1660                            Box::pin(async move {
1661                                let _ = rx.await;
1662                                Ok(())
1663                            })
1664                        })),
1665                    }),
1666                );
1667                block_server.handle_requests(stream).await.unwrap();
1668            },
1669            async move {
1670                let (session_proxy, server) = fidl::endpoints::create_proxy();
1671
1672                proxy.open_session(server).unwrap();
1673
1674                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1675                let vmo_id = session_proxy
1676                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1677                    .await
1678                    .unwrap()
1679                    .unwrap();
1680
1681                let mut fifo =
1682                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1683                let (mut reader, mut writer) = fifo.async_io();
1684
1685                writer
1686                    .write_entries(&BlockFifoRequest {
1687                        command: BlockFifoCommand {
1688                            opcode: BlockOpcode::Read.into_primitive(),
1689                            ..Default::default()
1690                        },
1691                        reqid: 1,
1692                        dev_offset: 1, // Intentionally use the same as `reqid`.
1693                        vmoid: vmo_id.id,
1694                        length: 1,
1695                        ..Default::default()
1696                    })
1697                    .await
1698                    .unwrap();
1699
1700                writer
1701                    .write_entries(&BlockFifoRequest {
1702                        command: BlockFifoCommand {
1703                            opcode: BlockOpcode::Read.into_primitive(),
1704                            ..Default::default()
1705                        },
1706                        reqid: 2,
1707                        dev_offset: 2,
1708                        vmoid: vmo_id.id,
1709                        length: 1,
1710                        ..Default::default()
1711                    })
1712                    .await
1713                    .unwrap();
1714
1715                // Wait till both those entries are pending.
1716                poll_fn(|cx: &mut Context<'_>| {
1717                    if waiting_readers.lock().len() == 2 {
1718                        Poll::Ready(())
1719                    } else {
1720                        // Yield to the executor.
1721                        cx.waker().wake_by_ref();
1722                        Poll::Pending
1723                    }
1724                })
1725                .await;
1726
1727                let mut response = BlockFifoResponse::default();
1728                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1729
1730                let (id, tx) = waiting_readers.lock().pop().unwrap();
1731                tx.send(()).unwrap();
1732
1733                reader.read_entries(&mut response).await.unwrap();
1734                assert_eq!(response.status, zx::sys::ZX_OK);
1735                assert_eq!(response.reqid, id);
1736
1737                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1738
1739                let (id, tx) = waiting_readers.lock().pop().unwrap();
1740                tx.send(()).unwrap();
1741
1742                reader.read_entries(&mut response).await.unwrap();
1743                assert_eq!(response.status, zx::sys::ZX_OK);
1744                assert_eq!(response.reqid, id);
1745            }
1746        );
1747    }
1748
1749    #[fuchsia::test]
1750    async fn test_groups() {
1751        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1752
1753        futures::join!(
1754            async move {
1755                let block_server = BlockServer::new(
1756                    BLOCK_SIZE,
1757                    Arc::new(MockInterface {
1758                        read_hook: Some(Box::new(move |_, _, _, _| Box::pin(async { Ok(()) }))),
1759                    }),
1760                );
1761                block_server.handle_requests(stream).await.unwrap();
1762            },
1763            async move {
1764                let (session_proxy, server) = fidl::endpoints::create_proxy();
1765
1766                proxy.open_session(server).unwrap();
1767
1768                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1769                let vmo_id = session_proxy
1770                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1771                    .await
1772                    .unwrap()
1773                    .unwrap();
1774
1775                let mut fifo =
1776                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1777                let (mut reader, mut writer) = fifo.async_io();
1778
1779                writer
1780                    .write_entries(&BlockFifoRequest {
1781                        command: BlockFifoCommand {
1782                            opcode: BlockOpcode::Read.into_primitive(),
1783                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1784                            ..Default::default()
1785                        },
1786                        group: 1,
1787                        vmoid: vmo_id.id,
1788                        length: 1,
1789                        ..Default::default()
1790                    })
1791                    .await
1792                    .unwrap();
1793
1794                writer
1795                    .write_entries(&BlockFifoRequest {
1796                        command: BlockFifoCommand {
1797                            opcode: BlockOpcode::Read.into_primitive(),
1798                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1799                            ..Default::default()
1800                        },
1801                        reqid: 2,
1802                        group: 1,
1803                        vmoid: vmo_id.id,
1804                        length: 1,
1805                        ..Default::default()
1806                    })
1807                    .await
1808                    .unwrap();
1809
1810                let mut response = BlockFifoResponse::default();
1811                reader.read_entries(&mut response).await.unwrap();
1812                assert_eq!(response.status, zx::sys::ZX_OK);
1813                assert_eq!(response.reqid, 2);
1814                assert_eq!(response.group, 1);
1815            }
1816        );
1817    }
1818
1819    #[fuchsia::test]
1820    async fn test_group_error() {
1821        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1822
1823        let counter = Arc::new(AtomicU64::new(0));
1824        let counter_clone = counter.clone();
1825
1826        futures::join!(
1827            async move {
1828                let block_server = BlockServer::new(
1829                    BLOCK_SIZE,
1830                    Arc::new(MockInterface {
1831                        read_hook: Some(Box::new(move |_, _, _, _| {
1832                            counter_clone.fetch_add(1, Ordering::Relaxed);
1833                            Box::pin(async { Err(zx::Status::BAD_STATE) })
1834                        })),
1835                    }),
1836                );
1837                block_server.handle_requests(stream).await.unwrap();
1838            },
1839            async move {
1840                let (session_proxy, server) = fidl::endpoints::create_proxy();
1841
1842                proxy.open_session(server).unwrap();
1843
1844                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1845                let vmo_id = session_proxy
1846                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1847                    .await
1848                    .unwrap()
1849                    .unwrap();
1850
1851                let mut fifo =
1852                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1853                let (mut reader, mut writer) = fifo.async_io();
1854
1855                writer
1856                    .write_entries(&BlockFifoRequest {
1857                        command: BlockFifoCommand {
1858                            opcode: BlockOpcode::Read.into_primitive(),
1859                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1860                            ..Default::default()
1861                        },
1862                        group: 1,
1863                        vmoid: vmo_id.id,
1864                        length: 1,
1865                        ..Default::default()
1866                    })
1867                    .await
1868                    .unwrap();
1869
1870                // Wait until processed.
1871                poll_fn(|cx: &mut Context<'_>| {
1872                    if counter.load(Ordering::Relaxed) == 1 {
1873                        Poll::Ready(())
1874                    } else {
1875                        // Yield to the executor.
1876                        cx.waker().wake_by_ref();
1877                        Poll::Pending
1878                    }
1879                })
1880                .await;
1881
1882                let mut response = BlockFifoResponse::default();
1883                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1884
1885                writer
1886                    .write_entries(&BlockFifoRequest {
1887                        command: BlockFifoCommand {
1888                            opcode: BlockOpcode::Read.into_primitive(),
1889                            flags: BlockIoFlag::GROUP_ITEM.bits(),
1890                            ..Default::default()
1891                        },
1892                        group: 1,
1893                        vmoid: vmo_id.id,
1894                        length: 1,
1895                        ..Default::default()
1896                    })
1897                    .await
1898                    .unwrap();
1899
1900                writer
1901                    .write_entries(&BlockFifoRequest {
1902                        command: BlockFifoCommand {
1903                            opcode: BlockOpcode::Read.into_primitive(),
1904                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1905                            ..Default::default()
1906                        },
1907                        reqid: 2,
1908                        group: 1,
1909                        vmoid: vmo_id.id,
1910                        length: 1,
1911                        ..Default::default()
1912                    })
1913                    .await
1914                    .unwrap();
1915
1916                reader.read_entries(&mut response).await.unwrap();
1917                assert_eq!(response.status, zx::sys::ZX_ERR_BAD_STATE);
1918                assert_eq!(response.reqid, 2);
1919                assert_eq!(response.group, 1);
1920
1921                assert!(futures::poll!(pin!(reader.read_entries(&mut response))).is_pending());
1922
1923                // Only the first request should have been processed.
1924                assert_eq!(counter.load(Ordering::Relaxed), 1);
1925            }
1926        );
1927    }
1928
1929    #[fuchsia::test]
1930    async fn test_group_with_two_lasts() {
1931        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
1932
1933        let (tx, rx) = oneshot::channel();
1934
1935        futures::join!(
1936            async move {
1937                let rx = Mutex::new(Some(rx));
1938                let block_server = BlockServer::new(
1939                    BLOCK_SIZE,
1940                    Arc::new(MockInterface {
1941                        read_hook: Some(Box::new(move |_, _, _, _| {
1942                            let rx = rx.lock().take().unwrap();
1943                            Box::pin(async {
1944                                let _ = rx.await;
1945                                Ok(())
1946                            })
1947                        })),
1948                    }),
1949                );
1950                block_server.handle_requests(stream).await.unwrap();
1951            },
1952            async move {
1953                let (session_proxy, server) = fidl::endpoints::create_proxy();
1954
1955                proxy.open_session(server).unwrap();
1956
1957                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
1958                let vmo_id = session_proxy
1959                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
1960                    .await
1961                    .unwrap()
1962                    .unwrap();
1963
1964                let mut fifo =
1965                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
1966                let (mut reader, mut writer) = fifo.async_io();
1967
1968                writer
1969                    .write_entries(&BlockFifoRequest {
1970                        command: BlockFifoCommand {
1971                            opcode: BlockOpcode::Read.into_primitive(),
1972                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1973                            ..Default::default()
1974                        },
1975                        reqid: 1,
1976                        group: 1,
1977                        vmoid: vmo_id.id,
1978                        length: 1,
1979                        ..Default::default()
1980                    })
1981                    .await
1982                    .unwrap();
1983
1984                writer
1985                    .write_entries(&BlockFifoRequest {
1986                        command: BlockFifoCommand {
1987                            opcode: BlockOpcode::Read.into_primitive(),
1988                            flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
1989                            ..Default::default()
1990                        },
1991                        reqid: 2,
1992                        group: 1,
1993                        vmoid: vmo_id.id,
1994                        length: 1,
1995                        ..Default::default()
1996                    })
1997                    .await
1998                    .unwrap();
1999
2000                // Send an independent request to flush through the fifo.
2001                writer
2002                    .write_entries(&BlockFifoRequest {
2003                        command: BlockFifoCommand {
2004                            opcode: BlockOpcode::CloseVmo.into_primitive(),
2005                            ..Default::default()
2006                        },
2007                        reqid: 3,
2008                        vmoid: vmo_id.id,
2009                        ..Default::default()
2010                    })
2011                    .await
2012                    .unwrap();
2013
2014                // It should succeed.
2015                let mut response = BlockFifoResponse::default();
2016                reader.read_entries(&mut response).await.unwrap();
2017                assert_eq!(response.status, zx::sys::ZX_OK);
2018                assert_eq!(response.reqid, 3);
2019
2020                // Now release the original request.
2021                tx.send(()).unwrap();
2022
2023                // The response should be for the first message tagged as last, and it should be
2024                // an error because we sent two messages with the LAST marker.
2025                let mut response = BlockFifoResponse::default();
2026                reader.read_entries(&mut response).await.unwrap();
2027                assert_eq!(response.status, zx::sys::ZX_ERR_INVALID_ARGS);
2028                assert_eq!(response.reqid, 1);
2029                assert_eq!(response.group, 1);
2030            }
2031        );
2032    }
2033
2034    #[fuchsia::test(allow_stalls = false)]
2035    async fn test_requests_dont_block_sessions() {
2036        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2037
2038        let (tx, rx) = oneshot::channel();
2039
2040        fasync::Task::local(async move {
2041            let rx = Mutex::new(Some(rx));
2042            let block_server = BlockServer::new(
2043                BLOCK_SIZE,
2044                Arc::new(MockInterface {
2045                    read_hook: Some(Box::new(move |_, _, _, _| {
2046                        let rx = rx.lock().take().unwrap();
2047                        Box::pin(async {
2048                            let _ = rx.await;
2049                            Ok(())
2050                        })
2051                    })),
2052                }),
2053            );
2054            block_server.handle_requests(stream).await.unwrap();
2055        })
2056        .detach();
2057
2058        let mut fut = pin!(async {
2059            let (session_proxy, server) = fidl::endpoints::create_proxy();
2060
2061            proxy.open_session(server).unwrap();
2062
2063            let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2064            let vmo_id = session_proxy
2065                .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2066                .await
2067                .unwrap()
2068                .unwrap();
2069
2070            let mut fifo =
2071                fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2072            let (mut reader, mut writer) = fifo.async_io();
2073
2074            writer
2075                .write_entries(&BlockFifoRequest {
2076                    command: BlockFifoCommand {
2077                        opcode: BlockOpcode::Read.into_primitive(),
2078                        flags: (BlockIoFlag::GROUP_ITEM | BlockIoFlag::GROUP_LAST).bits(),
2079                        ..Default::default()
2080                    },
2081                    reqid: 1,
2082                    group: 1,
2083                    vmoid: vmo_id.id,
2084                    length: 1,
2085                    ..Default::default()
2086                })
2087                .await
2088                .unwrap();
2089
2090            let mut response = BlockFifoResponse::default();
2091            reader.read_entries(&mut response).await.unwrap();
2092            assert_eq!(response.status, zx::sys::ZX_OK);
2093        });
2094
2095        // The response won't come back until we send on `tx`.
2096        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_pending());
2097
2098        let mut fut2 = pin!(proxy.get_volume_info());
2099
2100        // get_volume_info is set up to stall forever.
2101        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut2).await.is_pending());
2102
2103        // If we now free up the first future, it should resolve; the stalled call to
2104        // get_volume_info should not block the fifo response.
2105        let _ = tx.send(());
2106
2107        assert!(fasync::TestExecutor::poll_until_stalled(&mut fut).await.is_ready());
2108    }
2109
2110    #[fuchsia::test]
2111    async fn test_request_flow_control() {
2112        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2113
2114        // The client will ensure that MAX_REQUESTS are queued up before firing `event`, and the
2115        // server will block until that happens.
2116        const MAX_REQUESTS: u64 = FIFO_MAX_REQUESTS as u64;
2117        let event = Arc::new((event_listener::Event::new(), AtomicBool::new(false)));
2118        let event_clone = event.clone();
2119        futures::join!(
2120            async move {
2121                let block_server = BlockServer::new(
2122                    BLOCK_SIZE,
2123                    Arc::new(MockInterface {
2124                        read_hook: Some(Box::new(move |_, _, _, _| {
2125                            let event_clone = event_clone.clone();
2126                            Box::pin(async move {
2127                                if !event_clone.1.load(Ordering::SeqCst) {
2128                                    event_clone.0.listen().await;
2129                                }
2130                                Ok(())
2131                            })
2132                        })),
2133                    }),
2134                );
2135                block_server.handle_requests(stream).await.unwrap();
2136            },
2137            async move {
2138                let (session_proxy, server) = fidl::endpoints::create_proxy();
2139
2140                proxy.open_session(server).unwrap();
2141
2142                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2143                let vmo_id = session_proxy
2144                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2145                    .await
2146                    .unwrap()
2147                    .unwrap();
2148
2149                let mut fifo =
2150                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2151                let (mut reader, mut writer) = fifo.async_io();
2152
2153                for i in 0..MAX_REQUESTS {
2154                    writer
2155                        .write_entries(&BlockFifoRequest {
2156                            command: BlockFifoCommand {
2157                                opcode: BlockOpcode::Read.into_primitive(),
2158                                ..Default::default()
2159                            },
2160                            reqid: (i + 1) as u32,
2161                            dev_offset: i,
2162                            vmoid: vmo_id.id,
2163                            length: 1,
2164                            ..Default::default()
2165                        })
2166                        .await
2167                        .unwrap();
2168                }
2169                assert!(futures::poll!(pin!(writer.write_entries(&BlockFifoRequest {
2170                    command: BlockFifoCommand {
2171                        opcode: BlockOpcode::Read.into_primitive(),
2172                        ..Default::default()
2173                    },
2174                    reqid: u32::MAX,
2175                    dev_offset: MAX_REQUESTS,
2176                    vmoid: vmo_id.id,
2177                    length: 1,
2178                    ..Default::default()
2179                })))
2180                .is_pending());
2181                // OK, let the server start to process.
2182                event.1.store(true, Ordering::SeqCst);
2183                event.0.notify(usize::MAX);
2184                // For each entry we read, make sure we can write a new one in.
2185                let mut finished_reqids = vec![];
2186                for i in MAX_REQUESTS..2 * MAX_REQUESTS {
2187                    let mut response = BlockFifoResponse::default();
2188                    reader.read_entries(&mut response).await.unwrap();
2189                    assert_eq!(response.status, zx::sys::ZX_OK);
2190                    finished_reqids.push(response.reqid);
2191                    writer
2192                        .write_entries(&BlockFifoRequest {
2193                            command: BlockFifoCommand {
2194                                opcode: BlockOpcode::Read.into_primitive(),
2195                                ..Default::default()
2196                            },
2197                            reqid: (i + 1) as u32,
2198                            dev_offset: i,
2199                            vmoid: vmo_id.id,
2200                            length: 1,
2201                            ..Default::default()
2202                        })
2203                        .await
2204                        .unwrap();
2205                }
2206                let mut response = BlockFifoResponse::default();
2207                for _ in 0..MAX_REQUESTS {
2208                    reader.read_entries(&mut response).await.unwrap();
2209                    assert_eq!(response.status, zx::sys::ZX_OK);
2210                    finished_reqids.push(response.reqid);
2211                }
2212                // Verify that we got a response for each request.  Note that we can't assume FIFO
2213                // ordering.
2214                finished_reqids.sort();
2215                assert_eq!(finished_reqids.len(), 2 * MAX_REQUESTS as usize);
2216                let mut i = 1;
2217                for reqid in finished_reqids {
2218                    assert_eq!(reqid, i);
2219                    i += 1;
2220                }
2221            }
2222        );
2223    }
2224
2225    #[fuchsia::test]
2226    async fn test_passthrough_io_with_fixed_map() {
2227        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<fvolume::VolumeMarker>();
2228
2229        let expected_op = Arc::new(Mutex::new(None));
2230        let expected_op_clone = expected_op.clone();
2231        futures::join!(
2232            async {
2233                let block_server = BlockServer::new(
2234                    BLOCK_SIZE,
2235                    Arc::new(IoMockInterface {
2236                        return_errors: false,
2237                        do_checks: true,
2238                        expected_op: expected_op_clone,
2239                    }),
2240                );
2241                block_server.handle_requests(stream).await.unwrap();
2242            },
2243            async move {
2244                let (session_proxy, server) = fidl::endpoints::create_proxy();
2245
2246                let mappings = [fblock::BlockOffsetMapping {
2247                    source_block_offset: 0,
2248                    target_block_offset: 10,
2249                    length: 20,
2250                }];
2251                proxy.open_session_with_offset_map(server, None, Some(&mappings[..])).unwrap();
2252
2253                let vmo = zx::Vmo::create(zx::system_get_page_size() as u64).unwrap();
2254                let vmo_id = session_proxy
2255                    .attach_vmo(vmo.duplicate_handle(zx::Rights::SAME_RIGHTS).unwrap())
2256                    .await
2257                    .unwrap()
2258                    .unwrap();
2259
2260                let mut fifo =
2261                    fasync::Fifo::from_fifo(session_proxy.get_fifo().await.unwrap().unwrap());
2262                let (mut reader, mut writer) = fifo.async_io();
2263
2264                // READ
2265                *expected_op.lock() = Some(ExpectedOp::Read(11, 2, 3));
2266                writer
2267                    .write_entries(&BlockFifoRequest {
2268                        command: BlockFifoCommand {
2269                            opcode: BlockOpcode::Read.into_primitive(),
2270                            ..Default::default()
2271                        },
2272                        vmoid: vmo_id.id,
2273                        dev_offset: 1,
2274                        length: 2,
2275                        vmo_offset: 3,
2276                        ..Default::default()
2277                    })
2278                    .await
2279                    .unwrap();
2280
2281                let mut response = BlockFifoResponse::default();
2282                reader.read_entries(&mut response).await.unwrap();
2283                assert_eq!(response.status, zx::sys::ZX_OK);
2284
2285                // WRITE
2286                *expected_op.lock() = Some(ExpectedOp::Write(14, 5, 6));
2287                writer
2288                    .write_entries(&BlockFifoRequest {
2289                        command: BlockFifoCommand {
2290                            opcode: BlockOpcode::Write.into_primitive(),
2291                            ..Default::default()
2292                        },
2293                        vmoid: vmo_id.id,
2294                        dev_offset: 4,
2295                        length: 5,
2296                        vmo_offset: 6,
2297                        ..Default::default()
2298                    })
2299                    .await
2300                    .unwrap();
2301
2302                reader.read_entries(&mut response).await.unwrap();
2303                assert_eq!(response.status, zx::sys::ZX_OK);
2304
2305                // FLUSH
2306                *expected_op.lock() = Some(ExpectedOp::Flush);
2307                writer
2308                    .write_entries(&BlockFifoRequest {
2309                        command: BlockFifoCommand {
2310                            opcode: BlockOpcode::Flush.into_primitive(),
2311                            ..Default::default()
2312                        },
2313                        ..Default::default()
2314                    })
2315                    .await
2316                    .unwrap();
2317
2318                reader.read_entries(&mut response).await.unwrap();
2319                assert_eq!(response.status, zx::sys::ZX_OK);
2320
2321                // TRIM
2322                *expected_op.lock() = Some(ExpectedOp::Trim(17, 3));
2323                writer
2324                    .write_entries(&BlockFifoRequest {
2325                        command: BlockFifoCommand {
2326                            opcode: BlockOpcode::Trim.into_primitive(),
2327                            ..Default::default()
2328                        },
2329                        dev_offset: 7,
2330                        length: 3,
2331                        ..Default::default()
2332                    })
2333                    .await
2334                    .unwrap();
2335
2336                reader.read_entries(&mut response).await.unwrap();
2337                assert_eq!(response.status, zx::sys::ZX_OK);
2338
2339                // READ past window
2340                *expected_op.lock() = None;
2341                writer
2342                    .write_entries(&BlockFifoRequest {
2343                        command: BlockFifoCommand {
2344                            opcode: BlockOpcode::Read.into_primitive(),
2345                            ..Default::default()
2346                        },
2347                        vmoid: vmo_id.id,
2348                        dev_offset: 19,
2349                        length: 2,
2350                        vmo_offset: 3,
2351                        ..Default::default()
2352                    })
2353                    .await
2354                    .unwrap();
2355
2356                reader.read_entries(&mut response).await.unwrap();
2357                assert_eq!(response.status, zx::sys::ZX_ERR_OUT_OF_RANGE);
2358
2359                std::mem::drop(proxy);
2360            }
2361        );
2362    }
2363}