Skip to main content

block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
13use fuchsia_sync::Mutex;
14use futures::channel::oneshot;
15use futures::executor::block_on;
16use std::borrow::Borrow;
17use std::collections::HashMap;
18use std::future::Future;
19use std::hash::{Hash, Hasher};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::ops::{DerefMut, Range};
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU16, Ordering};
25use std::sync::{Arc, LazyLock};
26use std::task::{Context, Poll, Waker};
27use zx::sys::zx_handle_t;
28use zx::{self as zx, HandleBased as _};
29use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync, storage_trace as trace};
30
31pub use cache::Cache;
32
33pub use block::DeviceFlag as BlockDeviceFlag;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41/// If a trace flow ID isn't specified for requests, one will be generated.
42pub const NO_TRACE_ID: u64 = 0;
43
44pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47    match error {
48        fidl::Error::ClientChannelClosed { status, .. } => status,
49        _ => zx::Status::INTERNAL,
50    }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54    match BlockOpcode::from_primitive(opcode) {
55        Some(BlockOpcode::Read) => "read",
56        Some(BlockOpcode::Write) => "write",
57        Some(BlockOpcode::Flush) => "flush",
58        Some(BlockOpcode::Trim) => "trim",
59        Some(BlockOpcode::CloseVmo) => "close_vmo",
60        None => "unknown",
61    }
62}
63
64// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
65// reused within this process).
66fn generate_trace_flow_id(request_id: u32) -> u64 {
67    static SELF_HANDLE: LazyLock<zx_handle_t> =
68        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
69    *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74    Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79        BufferSlice::VmoId { vmo_id, offset, length }
80    }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84    fn from(buf: &'a [u8]) -> Self {
85        BufferSlice::Memory(buf)
86    }
87}
88
89pub enum MutableBufferSlice<'a> {
90    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91    Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96        MutableBufferSlice::VmoId { vmo_id, offset, length }
97    }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101    fn from(buf: &'a mut [u8]) -> Self {
102        MutableBufferSlice::Memory(buf)
103    }
104}
105
106#[derive(Default)]
107struct RequestState {
108    result: Option<zx::Status>,
109    waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114    // The fifo.
115    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117    // The next request ID to be used.
118    next_request_id: u32,
119
120    // A queue of messages to be sent on the fifo.
121    queue: std::collections::VecDeque<BlockFifoRequest>,
122
123    // Map from request ID to RequestState.
124    map: HashMap<u32, RequestState>,
125
126    // The waker for the FifoPoller.
127    poller_waker: Option<Waker>,
128
129    // If set, attach a barrier to the next write request
130    attach_barrier: bool,
131}
132
133impl FifoState {
134    fn terminate(&mut self) {
135        self.fifo.take();
136        for (_, request_state) in self.map.iter_mut() {
137            request_state.result.get_or_insert(zx::Status::CANCELED);
138            if let Some(waker) = request_state.waker.take() {
139                waker.wake();
140            }
141        }
142        if let Some(waker) = self.poller_waker.take() {
143            waker.wake();
144        }
145    }
146
147    // Returns true if polling should be terminated.
148    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
149        let fifo = if let Some(fifo) = self.fifo.as_ref() {
150            fifo
151        } else {
152            return true;
153        };
154
155        loop {
156            let slice = self.queue.as_slices().0;
157            if slice.is_empty() {
158                return false;
159            }
160            match fifo.try_write(context, slice) {
161                Poll::Ready(Ok(sent)) => {
162                    self.queue.drain(0..sent);
163                }
164                Poll::Ready(Err(_)) => {
165                    self.terminate();
166                    return true;
167                }
168                Poll::Pending => {
169                    return false;
170                }
171            }
172        }
173    }
174}
175
176type FifoStateRef = Arc<Mutex<FifoState>>;
177
178// A future used for fifo responses.
179struct ResponseFuture {
180    request_id: u32,
181    fifo_state: FifoStateRef,
182}
183
184impl ResponseFuture {
185    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
186        ResponseFuture { request_id, fifo_state }
187    }
188}
189
190impl Future for ResponseFuture {
191    type Output = Result<(), zx::Status>;
192
193    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194        let mut state = self.fifo_state.lock();
195        let request_state = state.map.get_mut(&self.request_id).unwrap();
196        if let Some(result) = request_state.result {
197            Poll::Ready(result.into())
198        } else {
199            request_state.waker.replace(context.waker().clone());
200            Poll::Pending
201        }
202    }
203}
204
205impl Drop for ResponseFuture {
206    fn drop(&mut self) {
207        let mut state = self.fifo_state.lock();
208        state.map.remove(&self.request_id).unwrap();
209        update_outstanding_requests_counter(state.map.len());
210    }
211}
212
213/// Wraps a vmo-id. Will panic if you forget to detach.
214#[derive(Debug)]
215#[must_use]
216pub struct VmoId(AtomicU16);
217
218impl VmoId {
219    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
220    pub fn new(id: u16) -> Self {
221        Self(AtomicU16::new(id))
222    }
223
224    /// Invalidates self and returns a new VmoId with the same underlying ID.
225    pub fn take(&self) -> Self {
226        Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
227    }
228
229    pub fn is_valid(&self) -> bool {
230        self.id() != VMOID_INVALID
231    }
232
233    /// Takes the ID.  The caller assumes responsibility for detaching.
234    #[must_use]
235    pub fn into_id(self) -> u16 {
236        self.0.swap(VMOID_INVALID, Ordering::Relaxed)
237    }
238
239    pub fn id(&self) -> u16 {
240        self.0.load(Ordering::Relaxed)
241    }
242}
243
244impl PartialEq for VmoId {
245    fn eq(&self, other: &Self) -> bool {
246        self.id() == other.id()
247    }
248}
249
250impl Eq for VmoId {}
251
252impl Drop for VmoId {
253    fn drop(&mut self) {
254        assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
255    }
256}
257
258impl Hash for VmoId {
259    fn hash<H: Hasher>(&self, state: &mut H) {
260        self.id().hash(state);
261    }
262}
263
264/// Represents a client connection to a block device. This is a simplified version of the block.fidl
265/// interface.
266/// Most users will use the RemoteBlockClient instantiation of this trait.
267pub trait BlockClient: Send + Sync {
268    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
269    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
270
271    /// Detaches the given vmo-id from the device.
272    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
273
274    /// Reads from the device at |device_offset| into the given buffer slice.
275    fn read_at(
276        &self,
277        buffer_slice: MutableBufferSlice<'_>,
278        device_offset: u64,
279    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
280        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
281    }
282
283    fn read_at_with_opts(
284        &self,
285        buffer_slice: MutableBufferSlice<'_>,
286        device_offset: u64,
287        opts: ReadOptions,
288    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
289        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
290    }
291
292    fn read_at_with_opts_traced(
293        &self,
294        buffer_slice: MutableBufferSlice<'_>,
295        device_offset: u64,
296        opts: ReadOptions,
297        trace_flow_id: u64,
298    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
299
300    /// Writes the data in |buffer_slice| to the device.
301    fn write_at(
302        &self,
303        buffer_slice: BufferSlice<'_>,
304        device_offset: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
306        self.write_at_with_opts_traced(
307            buffer_slice,
308            device_offset,
309            WriteOptions::default(),
310            NO_TRACE_ID,
311        )
312    }
313
314    fn write_at_with_opts(
315        &self,
316        buffer_slice: BufferSlice<'_>,
317        device_offset: u64,
318        opts: WriteOptions,
319    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
320        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
321    }
322
323    fn write_at_with_opts_traced(
324        &self,
325        buffer_slice: BufferSlice<'_>,
326        device_offset: u64,
327        opts: WriteOptions,
328        trace_flow_id: u64,
329    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
330
331    /// Trims the given range on the block device.
332    fn trim(
333        &self,
334        device_range: Range<u64>,
335    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
336        self.trim_traced(device_range, NO_TRACE_ID)
337    }
338
339    fn trim_traced(
340        &self,
341        device_range: Range<u64>,
342        trace_flow_id: u64,
343    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
344
345    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
346    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
347    /// This method makes it easier to guarantee that the barrier is attached to the correct
348    /// write operation when subsequent write operations can get reordered.
349    fn barrier(&self);
350
351    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
352        self.flush_traced(NO_TRACE_ID)
353    }
354
355    /// Sends a flush request to the underlying block device.
356    fn flush_traced(
357        &self,
358        trace_flow_id: u64,
359    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
360
361    /// Closes the fifo.
362    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
363
364    /// Returns the block size of the device.
365    fn block_size(&self) -> u32;
366
367    /// Returns the size, in blocks, of the device.
368    fn block_count(&self) -> u64;
369
370    /// Returns the maximum number of blocks which can be transferred in a single request.
371    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
372
373    /// Returns the block flags reported by the device.
374    fn block_flags(&self) -> BlockDeviceFlag;
375
376    /// Returns true if the remote fifo is still connected.
377    fn is_connected(&self) -> bool;
378}
379
380struct Common {
381    block_size: u32,
382    block_count: u64,
383    max_transfer_blocks: Option<NonZero<u32>>,
384    block_flags: BlockDeviceFlag,
385    fifo_state: FifoStateRef,
386    temp_vmo: futures::lock::Mutex<zx::Vmo>,
387    temp_vmo_id: VmoId,
388}
389
390impl Common {
391    fn new(
392        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
393        info: &block::BlockInfo,
394        temp_vmo: zx::Vmo,
395        temp_vmo_id: VmoId,
396    ) -> Self {
397        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
398        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
399        Self {
400            block_size: info.block_size,
401            block_count: info.block_count,
402            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
403                NonZero::new(info.max_transfer_size / info.block_size)
404            } else {
405                None
406            },
407            block_flags: info.flags,
408            fifo_state,
409            temp_vmo: futures::lock::Mutex::new(temp_vmo),
410            temp_vmo_id,
411        }
412    }
413
414    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
415        if bytes % self.block_size as u64 != 0 {
416            Err(zx::Status::INVALID_ARGS)
417        } else {
418            Ok(bytes / self.block_size as u64)
419        }
420    }
421
422    // Sends the request and waits for the response.
423    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
424        let (request_id, trace_flow_id) = {
425            let mut state = self.fifo_state.lock();
426
427            let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
428            if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
429                && state.attach_barrier
430            {
431                flags |= BlockIoFlag::PRE_BARRIER;
432                request.command.flags = flags.bits();
433                state.attach_barrier = false;
434            }
435
436            if state.fifo.is_none() {
437                // Fifo has been closed.
438                return Err(zx::Status::CANCELED);
439            }
440            trace::duration!(
441                "storage",
442                "block_client::send::start",
443                "op" => opcode_str(request.command.opcode),
444                "len" => request.length * self.block_size
445            );
446            let request_id = state.next_request_id;
447            state.next_request_id = state.next_request_id.overflowing_add(1).0;
448            assert!(
449                state.map.insert(request_id, RequestState::default()).is_none(),
450                "request id in use!"
451            );
452            update_outstanding_requests_counter(state.map.len());
453            request.reqid = request_id;
454            if request.trace_flow_id == NO_TRACE_ID {
455                request.trace_flow_id = generate_trace_flow_id(request_id);
456            }
457            let trace_flow_id = request.trace_flow_id;
458            trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
459            state.queue.push_back(request);
460            if let Some(waker) = state.poller_waker.clone() {
461                state.poll_send_requests(&mut Context::from_waker(&waker));
462            }
463            (request_id, trace_flow_id)
464        };
465        ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
466        trace::duration!("storage", "block_client::send::end");
467        trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
468        Ok(())
469    }
470
471    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
472        self.send(BlockFifoRequest {
473            command: BlockFifoCommand {
474                opcode: BlockOpcode::CloseVmo.into_primitive(),
475                flags: 0,
476                ..Default::default()
477            },
478            vmoid: vmo_id.into_id(),
479            ..Default::default()
480        })
481        .await
482    }
483
484    async fn read_at(
485        &self,
486        buffer_slice: MutableBufferSlice<'_>,
487        device_offset: u64,
488        opts: ReadOptions,
489        trace_flow_id: u64,
490    ) -> Result<(), zx::Status> {
491        let mut flags = BlockIoFlag::empty();
492
493        if opts.inline_crypto.is_enabled {
494            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
495        }
496
497        match buffer_slice {
498            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
499                self.send(BlockFifoRequest {
500                    command: BlockFifoCommand {
501                        opcode: BlockOpcode::Read.into_primitive(),
502                        flags: flags.bits(),
503                        ..Default::default()
504                    },
505                    vmoid: vmo_id.id(),
506                    length: self
507                        .to_blocks(length)?
508                        .try_into()
509                        .map_err(|_| zx::Status::INVALID_ARGS)?,
510                    vmo_offset: self.to_blocks(offset)?,
511                    dev_offset: self.to_blocks(device_offset)?,
512                    trace_flow_id,
513                    dun: opts.inline_crypto.dun,
514                    slot: opts.inline_crypto.slot,
515                    ..Default::default()
516                })
517                .await?
518            }
519            MutableBufferSlice::Memory(mut slice) => {
520                let temp_vmo = self.temp_vmo.lock().await;
521                let mut device_block = self.to_blocks(device_offset)?;
522                loop {
523                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
524                    let block_count = self.to_blocks(to_do as u64)? as u32;
525                    self.send(BlockFifoRequest {
526                        command: BlockFifoCommand {
527                            opcode: BlockOpcode::Read.into_primitive(),
528                            flags: flags.bits(),
529                            ..Default::default()
530                        },
531                        vmoid: self.temp_vmo_id.id(),
532                        length: block_count,
533                        vmo_offset: 0,
534                        dev_offset: device_block,
535                        trace_flow_id,
536                        dun: opts.inline_crypto.dun,
537                        slot: opts.inline_crypto.slot,
538                        ..Default::default()
539                    })
540                    .await?;
541                    temp_vmo.read(&mut slice[..to_do], 0)?;
542                    if to_do == slice.len() {
543                        break;
544                    }
545                    device_block += block_count as u64;
546                    slice = &mut slice[to_do..];
547                }
548            }
549        }
550        Ok(())
551    }
552
553    async fn write_at(
554        &self,
555        buffer_slice: BufferSlice<'_>,
556        device_offset: u64,
557        opts: WriteOptions,
558        trace_flow_id: u64,
559    ) -> Result<(), zx::Status> {
560        let mut flags = BlockIoFlag::empty();
561
562        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
563            flags |= BlockIoFlag::FORCE_ACCESS;
564        }
565
566        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
567            flags |= BlockIoFlag::PRE_BARRIER;
568        }
569
570        if opts.inline_crypto.is_enabled {
571            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
572        }
573
574        match buffer_slice {
575            BufferSlice::VmoId { vmo_id, offset, length } => {
576                self.send(BlockFifoRequest {
577                    command: BlockFifoCommand {
578                        opcode: BlockOpcode::Write.into_primitive(),
579                        flags: flags.bits(),
580                        ..Default::default()
581                    },
582                    vmoid: vmo_id.id(),
583                    length: self
584                        .to_blocks(length)?
585                        .try_into()
586                        .map_err(|_| zx::Status::INVALID_ARGS)?,
587                    vmo_offset: self.to_blocks(offset)?,
588                    dev_offset: self.to_blocks(device_offset)?,
589                    trace_flow_id,
590                    dun: opts.inline_crypto.dun,
591                    slot: opts.inline_crypto.slot,
592                    ..Default::default()
593                })
594                .await?;
595            }
596            BufferSlice::Memory(mut slice) => {
597                let temp_vmo = self.temp_vmo.lock().await;
598                let mut device_block = self.to_blocks(device_offset)?;
599                loop {
600                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
601                    let block_count = self.to_blocks(to_do as u64)? as u32;
602                    temp_vmo.write(&slice[..to_do], 0)?;
603                    self.send(BlockFifoRequest {
604                        command: BlockFifoCommand {
605                            opcode: BlockOpcode::Write.into_primitive(),
606                            flags: flags.bits(),
607                            ..Default::default()
608                        },
609                        vmoid: self.temp_vmo_id.id(),
610                        length: block_count,
611                        vmo_offset: 0,
612                        dev_offset: device_block,
613                        trace_flow_id,
614                        dun: opts.inline_crypto.dun,
615                        slot: opts.inline_crypto.slot,
616                        ..Default::default()
617                    })
618                    .await?;
619                    if to_do == slice.len() {
620                        break;
621                    }
622                    device_block += block_count as u64;
623                    slice = &slice[to_do..];
624                }
625            }
626        }
627        Ok(())
628    }
629
630    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
631        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
632        let dev_offset = self.to_blocks(device_range.start)?;
633        self.send(BlockFifoRequest {
634            command: BlockFifoCommand {
635                opcode: BlockOpcode::Trim.into_primitive(),
636                flags: 0,
637                ..Default::default()
638            },
639            vmoid: VMOID_INVALID,
640            length,
641            dev_offset,
642            trace_flow_id,
643            ..Default::default()
644        })
645        .await
646    }
647
648    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
649        self.send(BlockFifoRequest {
650            command: BlockFifoCommand {
651                opcode: BlockOpcode::Flush.into_primitive(),
652                flags: 0,
653                ..Default::default()
654            },
655            vmoid: VMOID_INVALID,
656            trace_flow_id,
657            ..Default::default()
658        })
659        .await
660    }
661
662    fn barrier(&self) {
663        self.fifo_state.lock().attach_barrier = true;
664    }
665
666    fn block_size(&self) -> u32 {
667        self.block_size
668    }
669
670    fn block_count(&self) -> u64 {
671        self.block_count
672    }
673
674    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
675        self.max_transfer_blocks.clone()
676    }
677
678    fn block_flags(&self) -> BlockDeviceFlag {
679        self.block_flags
680    }
681
682    fn is_connected(&self) -> bool {
683        self.fifo_state.lock().fifo.is_some()
684    }
685}
686
687impl Drop for Common {
688    fn drop(&mut self) {
689        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
690        // down.
691        let _ = self.temp_vmo_id.take().into_id();
692        self.fifo_state.lock().terminate();
693    }
694}
695
696// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
697pub struct RemoteBlockClient {
698    session: block::SessionProxy,
699    common: Common,
700}
701
702impl RemoteBlockClient {
703    /// Returns a connection to a remote block device via the given channel.
704    pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
705        let remote = remote.borrow();
706        let info =
707            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
708        let (session, server) = fidl::endpoints::create_proxy();
709        let () = remote.open_session(server).map_err(fidl_to_status)?;
710        Self::from_session(info, session).await
711    }
712
713    pub async fn from_session(
714        info: block::BlockInfo,
715        session: block::SessionProxy,
716    ) -> Result<Self, zx::Status> {
717        let fifo =
718            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
719        let fifo = fasync::Fifo::from_fifo(fifo);
720        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
721        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
722        let vmo_id =
723            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
724        let vmo_id = VmoId::new(vmo_id.id);
725        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
726    }
727}
728
729impl BlockClient for RemoteBlockClient {
730    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
731        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
732        let vmo_id = self
733            .session
734            .attach_vmo(dup)
735            .await
736            .map_err(fidl_to_status)?
737            .map_err(zx::Status::from_raw)?;
738        Ok(VmoId::new(vmo_id.id))
739    }
740
741    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
742        self.common.detach_vmo(vmo_id).await
743    }
744
745    async fn read_at_with_opts_traced(
746        &self,
747        buffer_slice: MutableBufferSlice<'_>,
748        device_offset: u64,
749        opts: ReadOptions,
750        trace_flow_id: u64,
751    ) -> Result<(), zx::Status> {
752        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
753    }
754
755    async fn write_at_with_opts_traced(
756        &self,
757        buffer_slice: BufferSlice<'_>,
758        device_offset: u64,
759        opts: WriteOptions,
760        trace_flow_id: u64,
761    ) -> Result<(), zx::Status> {
762        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
763    }
764
765    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
766        self.common.trim(range, trace_flow_id).await
767    }
768
769    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
770        self.common.flush(trace_flow_id).await
771    }
772
773    fn barrier(&self) {
774        self.common.barrier()
775    }
776
777    async fn close(&self) -> Result<(), zx::Status> {
778        let () =
779            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
780        Ok(())
781    }
782
783    fn block_size(&self) -> u32 {
784        self.common.block_size()
785    }
786
787    fn block_count(&self) -> u64 {
788        self.common.block_count()
789    }
790
791    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
792        self.common.max_transfer_blocks()
793    }
794
795    fn block_flags(&self) -> BlockDeviceFlag {
796        self.common.block_flags()
797    }
798
799    fn is_connected(&self) -> bool {
800        self.common.is_connected()
801    }
802}
803
804pub struct RemoteBlockClientSync {
805    session: block::SessionSynchronousProxy,
806    common: Common,
807}
808
809impl RemoteBlockClientSync {
810    /// Returns a connection to a remote block device via the given channel, but spawns a separate
811    /// thread for polling the fifo which makes it work in cases where no executor is configured for
812    /// the calling thread.
813    pub fn new(
814        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
815    ) -> Result<Self, zx::Status> {
816        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
817        let info = remote
818            .get_info(zx::MonotonicInstant::INFINITE)
819            .map_err(fidl_to_status)?
820            .map_err(zx::Status::from_raw)?;
821        let (client, server) = fidl::endpoints::create_endpoints();
822        let () = remote.open_session(server).map_err(fidl_to_status)?;
823        let session = block::SessionSynchronousProxy::new(client.into_channel());
824        let fifo = session
825            .get_fifo(zx::MonotonicInstant::INFINITE)
826            .map_err(fidl_to_status)?
827            .map_err(zx::Status::from_raw)?;
828        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
829        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
830        let vmo_id = session
831            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
832            .map_err(fidl_to_status)?
833            .map_err(zx::Status::from_raw)?;
834        let vmo_id = VmoId::new(vmo_id.id);
835
836        // The fifo needs to be instantiated from the thread that has the executor as that's where
837        // the fifo registers for notifications to be delivered.
838        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
839        std::thread::spawn(move || {
840            let mut executor = fasync::LocalExecutor::default();
841            let fifo = fasync::Fifo::from_fifo(fifo);
842            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
843            let fifo_state = common.fifo_state.clone();
844            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
845            executor.run_singlethreaded(FifoPoller { fifo_state });
846        });
847        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
848    }
849
850    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
851        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
852        let vmo_id = self
853            .session
854            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
855            .map_err(fidl_to_status)?
856            .map_err(zx::Status::from_raw)?;
857        Ok(VmoId::new(vmo_id.id))
858    }
859
860    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
861        block_on(self.common.detach_vmo(vmo_id))
862    }
863
864    pub fn read_at(
865        &self,
866        buffer_slice: MutableBufferSlice<'_>,
867        device_offset: u64,
868    ) -> Result<(), zx::Status> {
869        block_on(self.common.read_at(
870            buffer_slice,
871            device_offset,
872            ReadOptions::default(),
873            NO_TRACE_ID,
874        ))
875    }
876
877    pub fn write_at(
878        &self,
879        buffer_slice: BufferSlice<'_>,
880        device_offset: u64,
881    ) -> Result<(), zx::Status> {
882        block_on(self.common.write_at(
883            buffer_slice,
884            device_offset,
885            WriteOptions::default(),
886            NO_TRACE_ID,
887        ))
888    }
889
890    pub fn flush(&self) -> Result<(), zx::Status> {
891        block_on(self.common.flush(NO_TRACE_ID))
892    }
893
894    pub fn close(&self) -> Result<(), zx::Status> {
895        let () = self
896            .session
897            .close(zx::MonotonicInstant::INFINITE)
898            .map_err(fidl_to_status)?
899            .map_err(zx::Status::from_raw)?;
900        Ok(())
901    }
902
903    pub fn block_size(&self) -> u32 {
904        self.common.block_size()
905    }
906
907    pub fn block_count(&self) -> u64 {
908        self.common.block_count()
909    }
910
911    pub fn is_connected(&self) -> bool {
912        self.common.is_connected()
913    }
914}
915
916impl Drop for RemoteBlockClientSync {
917    fn drop(&mut self) {
918        // Ignore errors here as there is not much we can do about it.
919        let _ = self.close();
920    }
921}
922
923// FifoPoller is a future responsible for sending and receiving from the fifo.
924struct FifoPoller {
925    fifo_state: FifoStateRef,
926}
927
928impl Future for FifoPoller {
929    type Output = ();
930
931    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
932        let mut state_lock = self.fifo_state.lock();
933        let state = state_lock.deref_mut(); // So that we can split the borrow.
934
935        // Send requests.
936        if state.poll_send_requests(context) {
937            return Poll::Ready(());
938        }
939
940        // Receive responses.
941        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
942        loop {
943            let mut response = MaybeUninit::uninit();
944            match fifo.try_read(context, &mut response) {
945                Poll::Pending => {
946                    state.poller_waker = Some(context.waker().clone());
947                    return Poll::Pending;
948                }
949                Poll::Ready(Ok(_)) => {
950                    let response = unsafe { response.assume_init() };
951                    let request_id = response.reqid;
952                    // If the request isn't in the map, assume that it's a cancelled read.
953                    if let Some(request_state) = state.map.get_mut(&request_id) {
954                        request_state.result.replace(zx::Status::from_raw(response.status));
955                        if let Some(waker) = request_state.waker.take() {
956                            waker.wake();
957                        }
958                    }
959                }
960                Poll::Ready(Err(_)) => {
961                    state.terminate();
962                    return Poll::Ready(());
963                }
964            }
965        }
966    }
967}
968
969fn update_outstanding_requests_counter(outstanding: usize) {
970    trace::counter!("storage", "block-requests", 0, "outstanding" => outstanding);
971}
972
973#[cfg(test)]
974mod tests {
975    use super::{
976        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
977        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
978    };
979    use block_protocol::ReadOptions;
980    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
981    use fidl::endpoints::RequestStream as _;
982    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
983    use futures::join;
984    use futures::stream::StreamExt as _;
985    use futures::stream::futures_unordered::FuturesUnordered;
986    use ramdevice_client::RamdiskClient;
987    use std::borrow::Cow;
988    use std::num::NonZero;
989    use std::sync::Arc;
990    use std::sync::atomic::{AtomicBool, Ordering};
991    use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync};
992
993    const RAMDISK_BLOCK_SIZE: u64 = 1024;
994    const RAMDISK_BLOCK_COUNT: u64 = 1024;
995
996    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
997        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
998            .await
999            .expect("RamdiskClient::create failed");
1000        let client_end = ramdisk.open().expect("ramdisk.open failed");
1001        let proxy = client_end.into_proxy();
1002        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1003        assert_eq!(block_client.block_size(), 1024);
1004        let client_end = ramdisk.open().expect("ramdisk.open failed");
1005        let proxy = client_end.into_proxy();
1006        (ramdisk, proxy, block_client)
1007    }
1008
1009    #[fuchsia::test]
1010    async fn test_against_ram_disk() {
1011        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1012
1013        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1014        vmo.write(b"hello", 5).expect("vmo.write failed");
1015        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1016        block_client
1017            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1018            .await
1019            .expect("write_at failed");
1020        block_client
1021            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1022            .await
1023            .expect("read_at failed");
1024        let mut buf: [u8; 5] = Default::default();
1025        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1026        assert_eq!(&buf, b"hello");
1027        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1028    }
1029
1030    #[fuchsia::test]
1031    async fn test_alignment() {
1032        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1033        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1034        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1035        block_client
1036            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1037            .await
1038            .expect_err("expected failure due to bad alignment");
1039        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1040    }
1041
1042    #[fuchsia::test]
1043    async fn test_parallel_io() {
1044        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1045        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1046        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1047        let mut reads = Vec::new();
1048        for _ in 0..1024 {
1049            reads.push(
1050                block_client
1051                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1052                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1053            );
1054        }
1055        futures::future::join_all(reads).await;
1056        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1057    }
1058
1059    #[fuchsia::test]
1060    async fn test_closed_device() {
1061        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1062        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1063        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1064        let mut reads = Vec::new();
1065        for _ in 0..1024 {
1066            reads.push(
1067                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1068            );
1069        }
1070        assert!(block_client.is_connected());
1071        let _ = futures::join!(futures::future::join_all(reads), async {
1072            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1073        });
1074        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1075        while block_client
1076            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1077            .await
1078            .is_ok()
1079        {}
1080
1081        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1082        // get false-positives from is_connected.
1083        while block_client.is_connected() {
1084            // Sleep for a bit to minimise lock contention.
1085            fasync::Timer::new(fasync::MonotonicInstant::after(
1086                zx::MonotonicDuration::from_millis(500),
1087            ))
1088            .await;
1089        }
1090
1091        // But once is_connected goes negative, it should stay negative.
1092        assert_eq!(block_client.is_connected(), false);
1093        let _ = block_client.detach_vmo(vmo_id).await;
1094    }
1095
1096    #[fuchsia::test]
1097    async fn test_cancelled_reads() {
1098        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1099        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1100        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1101        {
1102            let mut reads = FuturesUnordered::new();
1103            for _ in 0..1024 {
1104                reads.push(
1105                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1106                );
1107            }
1108            // Read the first 500 results and then dump the rest.
1109            for _ in 0..500 {
1110                reads.next().await;
1111            }
1112        }
1113        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1114    }
1115
1116    #[fuchsia::test]
1117    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1118        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1119        let block_client_ref = &block_client;
1120        let test_one = |offset, len, fill| async move {
1121            let buf = vec![fill; len];
1122            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1123            // Read back an extra block either side.
1124            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1125            block_client_ref
1126                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1127                .await
1128                .expect("read_at failed");
1129            assert_eq!(
1130                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1131                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1132            );
1133            assert_eq!(
1134                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1135                &buf[..]
1136            );
1137            assert_eq!(
1138                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1139                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1140            );
1141        };
1142        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1143        join!(
1144            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1145            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1146        );
1147    }
1148
1149    // Implements dummy server which can be used by test cases to verify whether
1150    // channel messages and fifo operations are being received - by using set_channel_handler or
1151    // set_fifo_hander respectively
1152    struct FakeBlockServer<'a> {
1153        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1154        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1155        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1156    }
1157
1158    impl<'a> FakeBlockServer<'a> {
1159        // Creates a new FakeBlockServer given a channel to listen on.
1160        //
1161        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1162        // handles requests received from channel or the fifo respectfully.
1163        //
1164        // 'channel_handler' receives a message before it is handled by the default implementation
1165        // and can return 'true' to indicate all processing is done and no further processing of
1166        // that message is required
1167        //
1168        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1169        // FakeBlockServer will send over the fifo.
1170        fn new(
1171            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1172            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1173            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1174        ) -> FakeBlockServer<'a> {
1175            FakeBlockServer {
1176                server_channel: Some(server_channel),
1177                channel_handler: Box::new(channel_handler),
1178                fifo_handler: Box::new(fifo_handler),
1179            }
1180        }
1181
1182        // Runs the server
1183        async fn run(&mut self) {
1184            let server = self.server_channel.take().unwrap();
1185
1186            // Set up a mock server.
1187            let (server_fifo, client_fifo) =
1188                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1189                    .expect("Fifo::create failed");
1190            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1191
1192            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1193            let fifo_future = Abortable::new(
1194                async {
1195                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1196                    let (mut reader, mut writer) = fifo.async_io();
1197                    let mut request = BlockFifoRequest::default();
1198                    loop {
1199                        match reader.read_entries(&mut request).await {
1200                            Ok(1) => {}
1201                            Err(zx::Status::PEER_CLOSED) => break,
1202                            Err(e) => panic!("read_entry failed {:?}", e),
1203                            _ => unreachable!(),
1204                        };
1205
1206                        let response = self.fifo_handler.as_ref()(request);
1207                        writer
1208                            .write_entries(std::slice::from_ref(&response))
1209                            .await
1210                            .expect("write_entries failed");
1211                    }
1212                },
1213                fifo_future_abort_registration,
1214            );
1215
1216            let channel_future = async {
1217                server
1218                    .into_stream()
1219                    .for_each_concurrent(None, |request| async {
1220                        let request = request.expect("unexpected fidl error");
1221
1222                        match request {
1223                            block::BlockRequest::GetInfo { responder } => {
1224                                responder
1225                                    .send(Ok(&block::BlockInfo {
1226                                        block_count: 1024,
1227                                        block_size: 512,
1228                                        max_transfer_size: 1024 * 1024,
1229                                        flags: block::DeviceFlag::empty(),
1230                                    }))
1231                                    .expect("send failed");
1232                            }
1233                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1234                                let stream = session.into_stream();
1235                                stream
1236                                    .for_each(|request| async {
1237                                        let request = request.expect("unexpected fidl error");
1238                                        // Give a chance for the test to register and potentially
1239                                        // handle the event
1240                                        if self.channel_handler.as_ref()(&request) {
1241                                            return;
1242                                        }
1243                                        match request {
1244                                            block::SessionRequest::GetFifo { responder } => {
1245                                                match maybe_server_fifo.lock().take() {
1246                                                    Some(fifo) => {
1247                                                        responder.send(Ok(fifo.downcast()))
1248                                                    }
1249                                                    None => responder.send(Err(
1250                                                        zx::Status::NO_RESOURCES.into_raw(),
1251                                                    )),
1252                                                }
1253                                                .expect("send failed")
1254                                            }
1255                                            block::SessionRequest::AttachVmo {
1256                                                vmo: _,
1257                                                responder,
1258                                            } => responder
1259                                                .send(Ok(&block::VmoId { id: 1 }))
1260                                                .expect("send failed"),
1261                                            block::SessionRequest::Close { responder } => {
1262                                                fifo_future_abort.abort();
1263                                                responder.send(Ok(())).expect("send failed")
1264                                            }
1265                                        }
1266                                    })
1267                                    .await
1268                            }
1269                            _ => panic!("Unexpected message"),
1270                        }
1271                    })
1272                    .await;
1273            };
1274
1275            let _result = join!(fifo_future, channel_future);
1276            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1277        }
1278    }
1279
1280    #[fuchsia::test]
1281    async fn test_block_close_is_called() {
1282        let close_called = fuchsia_sync::Mutex::new(false);
1283        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1284
1285        std::thread::spawn(move || {
1286            let _block_client =
1287                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1288            // The drop here should cause Close to be sent.
1289        });
1290
1291        let channel_handler = |request: &block::SessionRequest| -> bool {
1292            if let block::SessionRequest::Close { .. } = request {
1293                *close_called.lock() = true;
1294            }
1295            false
1296        };
1297        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1298
1299        // After the server has finished running, we can check to see that close was called.
1300        assert!(*close_called.lock());
1301    }
1302
1303    #[fuchsia::test]
1304    async fn test_block_flush_is_called() {
1305        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1306
1307        struct Interface {
1308            flush_called: Arc<AtomicBool>,
1309        }
1310        impl block_server::async_interface::Interface for Interface {
1311            fn get_info(&self) -> Cow<'_, DeviceInfo> {
1312                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1313                    device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1314                    max_transfer_blocks: None,
1315                    block_range: Some(0..1000),
1316                    type_guid: [0; 16],
1317                    instance_guid: [0; 16],
1318                    name: "foo".to_string(),
1319                    flags: 0,
1320                }))
1321            }
1322
1323            async fn read(
1324                &self,
1325                _device_block_offset: u64,
1326                _block_count: u32,
1327                _vmo: &Arc<zx::Vmo>,
1328                _vmo_offset: u64,
1329                _opts: ReadOptions,
1330                _trace_flow_id: Option<NonZero<u64>>,
1331            ) -> Result<(), zx::Status> {
1332                unreachable!();
1333            }
1334
1335            async fn write(
1336                &self,
1337                _device_block_offset: u64,
1338                _block_count: u32,
1339                _vmo: &Arc<zx::Vmo>,
1340                _vmo_offset: u64,
1341                _opts: WriteOptions,
1342                _trace_flow_id: Option<NonZero<u64>>,
1343            ) -> Result<(), zx::Status> {
1344                unreachable!();
1345            }
1346
1347            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1348                self.flush_called.store(true, Ordering::Relaxed);
1349                Ok(())
1350            }
1351
1352            async fn trim(
1353                &self,
1354                _device_block_offset: u64,
1355                _block_count: u32,
1356                _trace_flow_id: Option<NonZero<u64>>,
1357            ) -> Result<(), zx::Status> {
1358                unreachable!();
1359            }
1360        }
1361
1362        let flush_called = Arc::new(AtomicBool::new(false));
1363
1364        futures::join!(
1365            async {
1366                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1367
1368                block_client.flush().await.expect("flush failed");
1369            },
1370            async {
1371                let block_server = BlockServer::new(
1372                    512,
1373                    Arc::new(Interface { flush_called: flush_called.clone() }),
1374                );
1375                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1376            }
1377        );
1378
1379        assert!(flush_called.load(Ordering::Relaxed));
1380    }
1381
1382    #[fuchsia::test]
1383    async fn test_trace_flow_ids_set() {
1384        let (proxy, server) = fidl::endpoints::create_proxy();
1385
1386        futures::join!(
1387            async {
1388                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1389                block_client.flush().await.expect("flush failed");
1390            },
1391            async {
1392                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1393                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1394                    if request.trace_flow_id > 0 {
1395                        *flow_id.lock() = Some(request.trace_flow_id);
1396                    }
1397                    BlockFifoResponse {
1398                        status: zx::Status::OK.into_raw(),
1399                        reqid: request.reqid,
1400                        ..Default::default()
1401                    }
1402                };
1403                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1404                // After the server has finished running, verify the trace flow ID was set to some value.
1405                assert!(flow_id.lock().is_some());
1406            }
1407        );
1408    }
1409}