block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
13use fuchsia_sync::Mutex;
14use futures::channel::oneshot;
15use futures::executor::block_on;
16use std::borrow::Borrow;
17use std::collections::HashMap;
18use std::future::Future;
19use std::hash::{Hash, Hasher};
20use std::mem::MaybeUninit;
21use std::num::NonZero;
22use std::ops::{DerefMut, Range};
23use std::pin::Pin;
24use std::sync::atomic::{AtomicU16, Ordering};
25use std::sync::{Arc, LazyLock};
26use std::task::{Context, Poll, Waker};
27use zx::sys::zx_handle_t;
28use zx::{self as zx, HandleBased as _};
29use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync, storage_trace as trace};
30
31pub use cache::Cache;
32
33pub use block::DeviceFlag as BlockDeviceFlag;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41/// If a trace flow ID isn't specified for requests, one will be generated.
42pub const NO_TRACE_ID: u64 = 0;
43
44pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47    match error {
48        fidl::Error::ClientChannelClosed { status, .. } => status,
49        _ => zx::Status::INTERNAL,
50    }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54    match BlockOpcode::from_primitive(opcode) {
55        Some(BlockOpcode::Read) => "read",
56        Some(BlockOpcode::Write) => "write",
57        Some(BlockOpcode::Flush) => "flush",
58        Some(BlockOpcode::Trim) => "trim",
59        Some(BlockOpcode::CloseVmo) => "close_vmo",
60        None => "unknown",
61    }
62}
63
64// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
65// reused within this process).
66fn generate_trace_flow_id(request_id: u32) -> u64 {
67    static SELF_HANDLE: LazyLock<zx_handle_t> =
68        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
69    *SELF_HANDLE as u64 + (request_id as u64) << 32
70}
71
72pub enum BufferSlice<'a> {
73    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
74    Memory(&'a [u8]),
75}
76
77impl<'a> BufferSlice<'a> {
78    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
79        BufferSlice::VmoId { vmo_id, offset, length }
80    }
81}
82
83impl<'a> From<&'a [u8]> for BufferSlice<'a> {
84    fn from(buf: &'a [u8]) -> Self {
85        BufferSlice::Memory(buf)
86    }
87}
88
89pub enum MutableBufferSlice<'a> {
90    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
91    Memory(&'a mut [u8]),
92}
93
94impl<'a> MutableBufferSlice<'a> {
95    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
96        MutableBufferSlice::VmoId { vmo_id, offset, length }
97    }
98}
99
100impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
101    fn from(buf: &'a mut [u8]) -> Self {
102        MutableBufferSlice::Memory(buf)
103    }
104}
105
106#[derive(Default)]
107struct RequestState {
108    result: Option<zx::Status>,
109    waker: Option<Waker>,
110}
111
112#[derive(Default)]
113struct FifoState {
114    // The fifo.
115    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
116
117    // The next request ID to be used.
118    next_request_id: u32,
119
120    // A queue of messages to be sent on the fifo.
121    queue: std::collections::VecDeque<BlockFifoRequest>,
122
123    // Map from request ID to RequestState.
124    map: HashMap<u32, RequestState>,
125
126    // The waker for the FifoPoller.
127    poller_waker: Option<Waker>,
128
129    // If set, attach a barrier to the next write request
130    attach_barrier: bool,
131}
132
133impl FifoState {
134    fn terminate(&mut self) {
135        self.fifo.take();
136        for (_, request_state) in self.map.iter_mut() {
137            request_state.result.get_or_insert(zx::Status::CANCELED);
138            if let Some(waker) = request_state.waker.take() {
139                waker.wake();
140            }
141        }
142        if let Some(waker) = self.poller_waker.take() {
143            waker.wake();
144        }
145    }
146
147    // Returns true if polling should be terminated.
148    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
149        let fifo = if let Some(fifo) = self.fifo.as_ref() {
150            fifo
151        } else {
152            return true;
153        };
154
155        loop {
156            let slice = self.queue.as_slices().0;
157            if slice.is_empty() {
158                return false;
159            }
160            match fifo.try_write(context, slice) {
161                Poll::Ready(Ok(sent)) => {
162                    self.queue.drain(0..sent);
163                }
164                Poll::Ready(Err(_)) => {
165                    self.terminate();
166                    return true;
167                }
168                Poll::Pending => {
169                    return false;
170                }
171            }
172        }
173    }
174}
175
176type FifoStateRef = Arc<Mutex<FifoState>>;
177
178// A future used for fifo responses.
179struct ResponseFuture {
180    request_id: u32,
181    fifo_state: FifoStateRef,
182}
183
184impl ResponseFuture {
185    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
186        ResponseFuture { request_id, fifo_state }
187    }
188}
189
190impl Future for ResponseFuture {
191    type Output = Result<(), zx::Status>;
192
193    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
194        let mut state = self.fifo_state.lock();
195        let request_state = state.map.get_mut(&self.request_id).unwrap();
196        if let Some(result) = request_state.result {
197            Poll::Ready(result.into())
198        } else {
199            request_state.waker.replace(context.waker().clone());
200            Poll::Pending
201        }
202    }
203}
204
205impl Drop for ResponseFuture {
206    fn drop(&mut self) {
207        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
208    }
209}
210
211/// Wraps a vmo-id. Will panic if you forget to detach.
212#[derive(Debug)]
213#[must_use]
214pub struct VmoId(AtomicU16);
215
216impl VmoId {
217    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
218    pub fn new(id: u16) -> Self {
219        Self(AtomicU16::new(id))
220    }
221
222    /// Invalidates self and returns a new VmoId with the same underlying ID.
223    pub fn take(&self) -> Self {
224        Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
225    }
226
227    pub fn is_valid(&self) -> bool {
228        self.id() != VMOID_INVALID
229    }
230
231    /// Takes the ID.  The caller assumes responsibility for detaching.
232    #[must_use]
233    pub fn into_id(self) -> u16 {
234        self.0.swap(VMOID_INVALID, Ordering::Relaxed)
235    }
236
237    pub fn id(&self) -> u16 {
238        self.0.load(Ordering::Relaxed)
239    }
240}
241
242impl PartialEq for VmoId {
243    fn eq(&self, other: &Self) -> bool {
244        self.id() == other.id()
245    }
246}
247
248impl Eq for VmoId {}
249
250impl Drop for VmoId {
251    fn drop(&mut self) {
252        assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
253    }
254}
255
256impl Hash for VmoId {
257    fn hash<H: Hasher>(&self, state: &mut H) {
258        self.id().hash(state);
259    }
260}
261
262/// Represents a client connection to a block device. This is a simplified version of the block.fidl
263/// interface.
264/// Most users will use the RemoteBlockClient instantiation of this trait.
265pub trait BlockClient: Send + Sync {
266    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
267    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
268
269    /// Detaches the given vmo-id from the device.
270    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
271
272    /// Reads from the device at |device_offset| into the given buffer slice.
273    fn read_at(
274        &self,
275        buffer_slice: MutableBufferSlice<'_>,
276        device_offset: u64,
277    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
278        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
279    }
280
281    fn read_at_with_opts(
282        &self,
283        buffer_slice: MutableBufferSlice<'_>,
284        device_offset: u64,
285        opts: ReadOptions,
286    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
288    }
289
290    fn read_at_with_opts_traced(
291        &self,
292        buffer_slice: MutableBufferSlice<'_>,
293        device_offset: u64,
294        opts: ReadOptions,
295        trace_flow_id: u64,
296    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
297
298    /// Writes the data in |buffer_slice| to the device.
299    fn write_at(
300        &self,
301        buffer_slice: BufferSlice<'_>,
302        device_offset: u64,
303    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
304        self.write_at_with_opts_traced(
305            buffer_slice,
306            device_offset,
307            WriteOptions::default(),
308            NO_TRACE_ID,
309        )
310    }
311
312    fn write_at_with_opts(
313        &self,
314        buffer_slice: BufferSlice<'_>,
315        device_offset: u64,
316        opts: WriteOptions,
317    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
318        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
319    }
320
321    fn write_at_with_opts_traced(
322        &self,
323        buffer_slice: BufferSlice<'_>,
324        device_offset: u64,
325        opts: WriteOptions,
326        trace_flow_id: u64,
327    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
328
329    /// Trims the given range on the block device.
330    fn trim(
331        &self,
332        device_range: Range<u64>,
333    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
334        self.trim_traced(device_range, NO_TRACE_ID)
335    }
336
337    fn trim_traced(
338        &self,
339        device_range: Range<u64>,
340        trace_flow_id: u64,
341    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
342
343    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
344    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
345    /// This method makes it easier to guarantee that the barrier is attached to the correct
346    /// write operation when subsequent write operations can get reordered.
347    fn barrier(&self);
348
349    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
350        self.flush_traced(NO_TRACE_ID)
351    }
352
353    /// Sends a flush request to the underlying block device.
354    fn flush_traced(
355        &self,
356        trace_flow_id: u64,
357    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
358
359    /// Closes the fifo.
360    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
361
362    /// Returns the block size of the device.
363    fn block_size(&self) -> u32;
364
365    /// Returns the size, in blocks, of the device.
366    fn block_count(&self) -> u64;
367
368    /// Returns the maximum number of blocks which can be transferred in a single request.
369    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
370
371    /// Returns the block flags reported by the device.
372    fn block_flags(&self) -> BlockDeviceFlag;
373
374    /// Returns true if the remote fifo is still connected.
375    fn is_connected(&self) -> bool;
376}
377
378struct Common {
379    block_size: u32,
380    block_count: u64,
381    max_transfer_blocks: Option<NonZero<u32>>,
382    block_flags: BlockDeviceFlag,
383    fifo_state: FifoStateRef,
384    temp_vmo: futures::lock::Mutex<zx::Vmo>,
385    temp_vmo_id: VmoId,
386}
387
388impl Common {
389    fn new(
390        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
391        info: &block::BlockInfo,
392        temp_vmo: zx::Vmo,
393        temp_vmo_id: VmoId,
394    ) -> Self {
395        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
396        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
397        Self {
398            block_size: info.block_size,
399            block_count: info.block_count,
400            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
401                NonZero::new(info.max_transfer_size / info.block_size)
402            } else {
403                None
404            },
405            block_flags: info.flags,
406            fifo_state,
407            temp_vmo: futures::lock::Mutex::new(temp_vmo),
408            temp_vmo_id,
409        }
410    }
411
412    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
413        if bytes % self.block_size as u64 != 0 {
414            Err(zx::Status::INVALID_ARGS)
415        } else {
416            Ok(bytes / self.block_size as u64)
417        }
418    }
419
420    // Sends the request and waits for the response.
421    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
422        let (request_id, trace_flow_id) = {
423            let mut state = self.fifo_state.lock();
424
425            let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
426            if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
427                && state.attach_barrier
428            {
429                flags |= BlockIoFlag::PRE_BARRIER;
430                request.command.flags = flags.bits();
431                state.attach_barrier = false;
432            }
433
434            if state.fifo.is_none() {
435                // Fifo has been closed.
436                return Err(zx::Status::CANCELED);
437            }
438            trace::duration!(
439                "storage",
440                "block_client::send::start",
441                "op" => opcode_str(request.command.opcode),
442                "len" => request.length * self.block_size
443            );
444            let request_id = state.next_request_id;
445            state.next_request_id = state.next_request_id.overflowing_add(1).0;
446            assert!(
447                state.map.insert(request_id, RequestState::default()).is_none(),
448                "request id in use!"
449            );
450            request.reqid = request_id;
451            if request.trace_flow_id == NO_TRACE_ID {
452                request.trace_flow_id = generate_trace_flow_id(request_id);
453            }
454            let trace_flow_id = request.trace_flow_id;
455            trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
456            state.queue.push_back(request);
457            if let Some(waker) = state.poller_waker.clone() {
458                state.poll_send_requests(&mut Context::from_waker(&waker));
459            }
460            (request_id, trace_flow_id)
461        };
462        ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
463        trace::duration!("storage", "block_client::send::end");
464        trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
465        Ok(())
466    }
467
468    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
469        self.send(BlockFifoRequest {
470            command: BlockFifoCommand {
471                opcode: BlockOpcode::CloseVmo.into_primitive(),
472                flags: 0,
473                ..Default::default()
474            },
475            vmoid: vmo_id.into_id(),
476            ..Default::default()
477        })
478        .await
479    }
480
481    async fn read_at(
482        &self,
483        buffer_slice: MutableBufferSlice<'_>,
484        device_offset: u64,
485        opts: ReadOptions,
486        trace_flow_id: u64,
487    ) -> Result<(), zx::Status> {
488        let mut flags = BlockIoFlag::empty();
489
490        if opts.inline_crypto.is_enabled {
491            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
492        }
493
494        match buffer_slice {
495            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
496                self.send(BlockFifoRequest {
497                    command: BlockFifoCommand {
498                        opcode: BlockOpcode::Read.into_primitive(),
499                        flags: flags.bits(),
500                        ..Default::default()
501                    },
502                    vmoid: vmo_id.id(),
503                    length: self
504                        .to_blocks(length)?
505                        .try_into()
506                        .map_err(|_| zx::Status::INVALID_ARGS)?,
507                    vmo_offset: self.to_blocks(offset)?,
508                    dev_offset: self.to_blocks(device_offset)?,
509                    trace_flow_id,
510                    dun: opts.inline_crypto.dun,
511                    slot: opts.inline_crypto.slot,
512                    ..Default::default()
513                })
514                .await?
515            }
516            MutableBufferSlice::Memory(mut slice) => {
517                let temp_vmo = self.temp_vmo.lock().await;
518                let mut device_block = self.to_blocks(device_offset)?;
519                loop {
520                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
521                    let block_count = self.to_blocks(to_do as u64)? as u32;
522                    self.send(BlockFifoRequest {
523                        command: BlockFifoCommand {
524                            opcode: BlockOpcode::Read.into_primitive(),
525                            flags: flags.bits(),
526                            ..Default::default()
527                        },
528                        vmoid: self.temp_vmo_id.id(),
529                        length: block_count,
530                        vmo_offset: 0,
531                        dev_offset: device_block,
532                        trace_flow_id,
533                        dun: opts.inline_crypto.dun,
534                        slot: opts.inline_crypto.slot,
535                        ..Default::default()
536                    })
537                    .await?;
538                    temp_vmo.read(&mut slice[..to_do], 0)?;
539                    if to_do == slice.len() {
540                        break;
541                    }
542                    device_block += block_count as u64;
543                    slice = &mut slice[to_do..];
544                }
545            }
546        }
547        Ok(())
548    }
549
550    async fn write_at(
551        &self,
552        buffer_slice: BufferSlice<'_>,
553        device_offset: u64,
554        opts: WriteOptions,
555        trace_flow_id: u64,
556    ) -> Result<(), zx::Status> {
557        let mut flags = BlockIoFlag::empty();
558
559        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
560            flags |= BlockIoFlag::FORCE_ACCESS;
561        }
562
563        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
564            flags |= BlockIoFlag::PRE_BARRIER;
565        }
566
567        if opts.inline_crypto.is_enabled {
568            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
569        }
570
571        match buffer_slice {
572            BufferSlice::VmoId { vmo_id, offset, length } => {
573                self.send(BlockFifoRequest {
574                    command: BlockFifoCommand {
575                        opcode: BlockOpcode::Write.into_primitive(),
576                        flags: flags.bits(),
577                        ..Default::default()
578                    },
579                    vmoid: vmo_id.id(),
580                    length: self
581                        .to_blocks(length)?
582                        .try_into()
583                        .map_err(|_| zx::Status::INVALID_ARGS)?,
584                    vmo_offset: self.to_blocks(offset)?,
585                    dev_offset: self.to_blocks(device_offset)?,
586                    trace_flow_id,
587                    dun: opts.inline_crypto.dun,
588                    slot: opts.inline_crypto.slot,
589                    ..Default::default()
590                })
591                .await?;
592            }
593            BufferSlice::Memory(mut slice) => {
594                let temp_vmo = self.temp_vmo.lock().await;
595                let mut device_block = self.to_blocks(device_offset)?;
596                loop {
597                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
598                    let block_count = self.to_blocks(to_do as u64)? as u32;
599                    temp_vmo.write(&slice[..to_do], 0)?;
600                    self.send(BlockFifoRequest {
601                        command: BlockFifoCommand {
602                            opcode: BlockOpcode::Write.into_primitive(),
603                            flags: flags.bits(),
604                            ..Default::default()
605                        },
606                        vmoid: self.temp_vmo_id.id(),
607                        length: block_count,
608                        vmo_offset: 0,
609                        dev_offset: device_block,
610                        trace_flow_id,
611                        dun: opts.inline_crypto.dun,
612                        slot: opts.inline_crypto.slot,
613                        ..Default::default()
614                    })
615                    .await?;
616                    if to_do == slice.len() {
617                        break;
618                    }
619                    device_block += block_count as u64;
620                    slice = &slice[to_do..];
621                }
622            }
623        }
624        Ok(())
625    }
626
627    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
628        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
629        let dev_offset = self.to_blocks(device_range.start)?;
630        self.send(BlockFifoRequest {
631            command: BlockFifoCommand {
632                opcode: BlockOpcode::Trim.into_primitive(),
633                flags: 0,
634                ..Default::default()
635            },
636            vmoid: VMOID_INVALID,
637            length,
638            dev_offset,
639            trace_flow_id,
640            ..Default::default()
641        })
642        .await
643    }
644
645    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
646        self.send(BlockFifoRequest {
647            command: BlockFifoCommand {
648                opcode: BlockOpcode::Flush.into_primitive(),
649                flags: 0,
650                ..Default::default()
651            },
652            vmoid: VMOID_INVALID,
653            trace_flow_id,
654            ..Default::default()
655        })
656        .await
657    }
658
659    fn barrier(&self) {
660        self.fifo_state.lock().attach_barrier = true;
661    }
662
663    fn block_size(&self) -> u32 {
664        self.block_size
665    }
666
667    fn block_count(&self) -> u64 {
668        self.block_count
669    }
670
671    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
672        self.max_transfer_blocks.clone()
673    }
674
675    fn block_flags(&self) -> BlockDeviceFlag {
676        self.block_flags
677    }
678
679    fn is_connected(&self) -> bool {
680        self.fifo_state.lock().fifo.is_some()
681    }
682}
683
684impl Drop for Common {
685    fn drop(&mut self) {
686        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
687        // down.
688        let _ = self.temp_vmo_id.take().into_id();
689        self.fifo_state.lock().terminate();
690    }
691}
692
693// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
694pub struct RemoteBlockClient {
695    session: block::SessionProxy,
696    common: Common,
697}
698
699impl RemoteBlockClient {
700    /// Returns a connection to a remote block device via the given channel.
701    pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
702        let remote = remote.borrow();
703        let info =
704            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
705        let (session, server) = fidl::endpoints::create_proxy();
706        let () = remote.open_session(server).map_err(fidl_to_status)?;
707        Self::from_session(info, session).await
708    }
709
710    pub async fn from_session(
711        info: block::BlockInfo,
712        session: block::SessionProxy,
713    ) -> Result<Self, zx::Status> {
714        let fifo =
715            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
716        let fifo = fasync::Fifo::from_fifo(fifo);
717        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
718        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
719        let vmo_id =
720            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
721        let vmo_id = VmoId::new(vmo_id.id);
722        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
723    }
724}
725
726impl BlockClient for RemoteBlockClient {
727    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
728        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
729        let vmo_id = self
730            .session
731            .attach_vmo(dup)
732            .await
733            .map_err(fidl_to_status)?
734            .map_err(zx::Status::from_raw)?;
735        Ok(VmoId::new(vmo_id.id))
736    }
737
738    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
739        self.common.detach_vmo(vmo_id).await
740    }
741
742    async fn read_at_with_opts_traced(
743        &self,
744        buffer_slice: MutableBufferSlice<'_>,
745        device_offset: u64,
746        opts: ReadOptions,
747        trace_flow_id: u64,
748    ) -> Result<(), zx::Status> {
749        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
750    }
751
752    async fn write_at_with_opts_traced(
753        &self,
754        buffer_slice: BufferSlice<'_>,
755        device_offset: u64,
756        opts: WriteOptions,
757        trace_flow_id: u64,
758    ) -> Result<(), zx::Status> {
759        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
760    }
761
762    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
763        self.common.trim(range, trace_flow_id).await
764    }
765
766    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
767        self.common.flush(trace_flow_id).await
768    }
769
770    fn barrier(&self) {
771        self.common.barrier()
772    }
773
774    async fn close(&self) -> Result<(), zx::Status> {
775        let () =
776            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
777        Ok(())
778    }
779
780    fn block_size(&self) -> u32 {
781        self.common.block_size()
782    }
783
784    fn block_count(&self) -> u64 {
785        self.common.block_count()
786    }
787
788    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
789        self.common.max_transfer_blocks()
790    }
791
792    fn block_flags(&self) -> BlockDeviceFlag {
793        self.common.block_flags()
794    }
795
796    fn is_connected(&self) -> bool {
797        self.common.is_connected()
798    }
799}
800
801pub struct RemoteBlockClientSync {
802    session: block::SessionSynchronousProxy,
803    common: Common,
804}
805
806impl RemoteBlockClientSync {
807    /// Returns a connection to a remote block device via the given channel, but spawns a separate
808    /// thread for polling the fifo which makes it work in cases where no executor is configured for
809    /// the calling thread.
810    pub fn new(
811        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
812    ) -> Result<Self, zx::Status> {
813        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
814        let info = remote
815            .get_info(zx::MonotonicInstant::INFINITE)
816            .map_err(fidl_to_status)?
817            .map_err(zx::Status::from_raw)?;
818        let (client, server) = fidl::endpoints::create_endpoints();
819        let () = remote.open_session(server).map_err(fidl_to_status)?;
820        let session = block::SessionSynchronousProxy::new(client.into_channel());
821        let fifo = session
822            .get_fifo(zx::MonotonicInstant::INFINITE)
823            .map_err(fidl_to_status)?
824            .map_err(zx::Status::from_raw)?;
825        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
826        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
827        let vmo_id = session
828            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
829            .map_err(fidl_to_status)?
830            .map_err(zx::Status::from_raw)?;
831        let vmo_id = VmoId::new(vmo_id.id);
832
833        // The fifo needs to be instantiated from the thread that has the executor as that's where
834        // the fifo registers for notifications to be delivered.
835        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
836        std::thread::spawn(move || {
837            let mut executor = fasync::LocalExecutor::default();
838            let fifo = fasync::Fifo::from_fifo(fifo);
839            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
840            let fifo_state = common.fifo_state.clone();
841            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
842            executor.run_singlethreaded(FifoPoller { fifo_state });
843        });
844        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
845    }
846
847    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
848        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
849        let vmo_id = self
850            .session
851            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
852            .map_err(fidl_to_status)?
853            .map_err(zx::Status::from_raw)?;
854        Ok(VmoId::new(vmo_id.id))
855    }
856
857    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
858        block_on(self.common.detach_vmo(vmo_id))
859    }
860
861    pub fn read_at(
862        &self,
863        buffer_slice: MutableBufferSlice<'_>,
864        device_offset: u64,
865    ) -> Result<(), zx::Status> {
866        block_on(self.common.read_at(
867            buffer_slice,
868            device_offset,
869            ReadOptions::default(),
870            NO_TRACE_ID,
871        ))
872    }
873
874    pub fn write_at(
875        &self,
876        buffer_slice: BufferSlice<'_>,
877        device_offset: u64,
878    ) -> Result<(), zx::Status> {
879        block_on(self.common.write_at(
880            buffer_slice,
881            device_offset,
882            WriteOptions::default(),
883            NO_TRACE_ID,
884        ))
885    }
886
887    pub fn flush(&self) -> Result<(), zx::Status> {
888        block_on(self.common.flush(NO_TRACE_ID))
889    }
890
891    pub fn close(&self) -> Result<(), zx::Status> {
892        let () = self
893            .session
894            .close(zx::MonotonicInstant::INFINITE)
895            .map_err(fidl_to_status)?
896            .map_err(zx::Status::from_raw)?;
897        Ok(())
898    }
899
900    pub fn block_size(&self) -> u32 {
901        self.common.block_size()
902    }
903
904    pub fn block_count(&self) -> u64 {
905        self.common.block_count()
906    }
907
908    pub fn is_connected(&self) -> bool {
909        self.common.is_connected()
910    }
911}
912
913impl Drop for RemoteBlockClientSync {
914    fn drop(&mut self) {
915        // Ignore errors here as there is not much we can do about it.
916        let _ = self.close();
917    }
918}
919
920// FifoPoller is a future responsible for sending and receiving from the fifo.
921struct FifoPoller {
922    fifo_state: FifoStateRef,
923}
924
925impl Future for FifoPoller {
926    type Output = ();
927
928    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
929        let mut state_lock = self.fifo_state.lock();
930        let state = state_lock.deref_mut(); // So that we can split the borrow.
931
932        // Send requests.
933        if state.poll_send_requests(context) {
934            return Poll::Ready(());
935        }
936
937        // Receive responses.
938        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
939        loop {
940            let mut response = MaybeUninit::uninit();
941            match fifo.try_read(context, &mut response) {
942                Poll::Pending => {
943                    state.poller_waker = Some(context.waker().clone());
944                    return Poll::Pending;
945                }
946                Poll::Ready(Ok(_)) => {
947                    let response = unsafe { response.assume_init() };
948                    let request_id = response.reqid;
949                    // If the request isn't in the map, assume that it's a cancelled read.
950                    if let Some(request_state) = state.map.get_mut(&request_id) {
951                        request_state.result.replace(zx::Status::from_raw(response.status));
952                        if let Some(waker) = request_state.waker.take() {
953                            waker.wake();
954                        }
955                    }
956                }
957                Poll::Ready(Err(_)) => {
958                    state.terminate();
959                    return Poll::Ready(());
960                }
961            }
962        }
963    }
964}
965
966#[cfg(test)]
967mod tests {
968    use super::{
969        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
970        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
971    };
972    use block_protocol::ReadOptions;
973    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
974    use fidl::endpoints::RequestStream as _;
975    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
976    use futures::join;
977    use futures::stream::StreamExt as _;
978    use futures::stream::futures_unordered::FuturesUnordered;
979    use ramdevice_client::RamdiskClient;
980    use std::borrow::Cow;
981    use std::num::NonZero;
982    use std::sync::Arc;
983    use std::sync::atomic::{AtomicBool, Ordering};
984    use {fidl_fuchsia_storage_block as block, fuchsia_async as fasync};
985
986    const RAMDISK_BLOCK_SIZE: u64 = 1024;
987    const RAMDISK_BLOCK_COUNT: u64 = 1024;
988
989    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
990        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
991            .await
992            .expect("RamdiskClient::create failed");
993        let client_end = ramdisk.open().expect("ramdisk.open failed");
994        let proxy = client_end.into_proxy();
995        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
996        assert_eq!(block_client.block_size(), 1024);
997        let client_end = ramdisk.open().expect("ramdisk.open failed");
998        let proxy = client_end.into_proxy();
999        (ramdisk, proxy, block_client)
1000    }
1001
1002    #[fuchsia::test]
1003    async fn test_against_ram_disk() {
1004        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1005
1006        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1007        vmo.write(b"hello", 5).expect("vmo.write failed");
1008        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1009        block_client
1010            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1011            .await
1012            .expect("write_at failed");
1013        block_client
1014            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1015            .await
1016            .expect("read_at failed");
1017        let mut buf: [u8; 5] = Default::default();
1018        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1019        assert_eq!(&buf, b"hello");
1020        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1021    }
1022
1023    #[fuchsia::test]
1024    async fn test_alignment() {
1025        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1026        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1027        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1028        block_client
1029            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1030            .await
1031            .expect_err("expected failure due to bad alignment");
1032        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1033    }
1034
1035    #[fuchsia::test]
1036    async fn test_parallel_io() {
1037        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1038        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1039        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1040        let mut reads = Vec::new();
1041        for _ in 0..1024 {
1042            reads.push(
1043                block_client
1044                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1045                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1046            );
1047        }
1048        futures::future::join_all(reads).await;
1049        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1050    }
1051
1052    #[fuchsia::test]
1053    async fn test_closed_device() {
1054        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1055        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1056        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1057        let mut reads = Vec::new();
1058        for _ in 0..1024 {
1059            reads.push(
1060                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1061            );
1062        }
1063        assert!(block_client.is_connected());
1064        let _ = futures::join!(futures::future::join_all(reads), async {
1065            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1066        });
1067        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1068        while block_client
1069            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1070            .await
1071            .is_ok()
1072        {}
1073
1074        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1075        // get false-positives from is_connected.
1076        while block_client.is_connected() {
1077            // Sleep for a bit to minimise lock contention.
1078            fasync::Timer::new(fasync::MonotonicInstant::after(
1079                zx::MonotonicDuration::from_millis(500),
1080            ))
1081            .await;
1082        }
1083
1084        // But once is_connected goes negative, it should stay negative.
1085        assert_eq!(block_client.is_connected(), false);
1086        let _ = block_client.detach_vmo(vmo_id).await;
1087    }
1088
1089    #[fuchsia::test]
1090    async fn test_cancelled_reads() {
1091        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1092        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1093        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1094        {
1095            let mut reads = FuturesUnordered::new();
1096            for _ in 0..1024 {
1097                reads.push(
1098                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1099                );
1100            }
1101            // Read the first 500 results and then dump the rest.
1102            for _ in 0..500 {
1103                reads.next().await;
1104            }
1105        }
1106        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1107    }
1108
1109    #[fuchsia::test]
1110    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1111        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1112        let block_client_ref = &block_client;
1113        let test_one = |offset, len, fill| async move {
1114            let buf = vec![fill; len];
1115            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1116            // Read back an extra block either side.
1117            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1118            block_client_ref
1119                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1120                .await
1121                .expect("read_at failed");
1122            assert_eq!(
1123                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1124                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1125            );
1126            assert_eq!(
1127                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1128                &buf[..]
1129            );
1130            assert_eq!(
1131                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1132                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1133            );
1134        };
1135        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1136        join!(
1137            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1138            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1139        );
1140    }
1141
1142    // Implements dummy server which can be used by test cases to verify whether
1143    // channel messages and fifo operations are being received - by using set_channel_handler or
1144    // set_fifo_hander respectively
1145    struct FakeBlockServer<'a> {
1146        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1147        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1148        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1149    }
1150
1151    impl<'a> FakeBlockServer<'a> {
1152        // Creates a new FakeBlockServer given a channel to listen on.
1153        //
1154        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1155        // handles requests received from channel or the fifo respectfully.
1156        //
1157        // 'channel_handler' receives a message before it is handled by the default implementation
1158        // and can return 'true' to indicate all processing is done and no further processing of
1159        // that message is required
1160        //
1161        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1162        // FakeBlockServer will send over the fifo.
1163        fn new(
1164            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1165            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1166            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1167        ) -> FakeBlockServer<'a> {
1168            FakeBlockServer {
1169                server_channel: Some(server_channel),
1170                channel_handler: Box::new(channel_handler),
1171                fifo_handler: Box::new(fifo_handler),
1172            }
1173        }
1174
1175        // Runs the server
1176        async fn run(&mut self) {
1177            let server = self.server_channel.take().unwrap();
1178
1179            // Set up a mock server.
1180            let (server_fifo, client_fifo) =
1181                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1182                    .expect("Fifo::create failed");
1183            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1184
1185            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1186            let fifo_future = Abortable::new(
1187                async {
1188                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1189                    let (mut reader, mut writer) = fifo.async_io();
1190                    let mut request = BlockFifoRequest::default();
1191                    loop {
1192                        match reader.read_entries(&mut request).await {
1193                            Ok(1) => {}
1194                            Err(zx::Status::PEER_CLOSED) => break,
1195                            Err(e) => panic!("read_entry failed {:?}", e),
1196                            _ => unreachable!(),
1197                        };
1198
1199                        let response = self.fifo_handler.as_ref()(request);
1200                        writer
1201                            .write_entries(std::slice::from_ref(&response))
1202                            .await
1203                            .expect("write_entries failed");
1204                    }
1205                },
1206                fifo_future_abort_registration,
1207            );
1208
1209            let channel_future = async {
1210                server
1211                    .into_stream()
1212                    .for_each_concurrent(None, |request| async {
1213                        let request = request.expect("unexpected fidl error");
1214
1215                        match request {
1216                            block::BlockRequest::GetInfo { responder } => {
1217                                responder
1218                                    .send(Ok(&block::BlockInfo {
1219                                        block_count: 1024,
1220                                        block_size: 512,
1221                                        max_transfer_size: 1024 * 1024,
1222                                        flags: block::DeviceFlag::empty(),
1223                                    }))
1224                                    .expect("send failed");
1225                            }
1226                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1227                                let stream = session.into_stream();
1228                                stream
1229                                    .for_each(|request| async {
1230                                        let request = request.expect("unexpected fidl error");
1231                                        // Give a chance for the test to register and potentially
1232                                        // handle the event
1233                                        if self.channel_handler.as_ref()(&request) {
1234                                            return;
1235                                        }
1236                                        match request {
1237                                            block::SessionRequest::GetFifo { responder } => {
1238                                                match maybe_server_fifo.lock().take() {
1239                                                    Some(fifo) => {
1240                                                        responder.send(Ok(fifo.downcast()))
1241                                                    }
1242                                                    None => responder.send(Err(
1243                                                        zx::Status::NO_RESOURCES.into_raw(),
1244                                                    )),
1245                                                }
1246                                                .expect("send failed")
1247                                            }
1248                                            block::SessionRequest::AttachVmo {
1249                                                vmo: _,
1250                                                responder,
1251                                            } => responder
1252                                                .send(Ok(&block::VmoId { id: 1 }))
1253                                                .expect("send failed"),
1254                                            block::SessionRequest::Close { responder } => {
1255                                                fifo_future_abort.abort();
1256                                                responder.send(Ok(())).expect("send failed")
1257                                            }
1258                                        }
1259                                    })
1260                                    .await
1261                            }
1262                            _ => panic!("Unexpected message"),
1263                        }
1264                    })
1265                    .await;
1266            };
1267
1268            let _result = join!(fifo_future, channel_future);
1269            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1270        }
1271    }
1272
1273    #[fuchsia::test]
1274    async fn test_block_close_is_called() {
1275        let close_called = fuchsia_sync::Mutex::new(false);
1276        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1277
1278        std::thread::spawn(move || {
1279            let _block_client =
1280                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1281            // The drop here should cause Close to be sent.
1282        });
1283
1284        let channel_handler = |request: &block::SessionRequest| -> bool {
1285            if let block::SessionRequest::Close { .. } = request {
1286                *close_called.lock() = true;
1287            }
1288            false
1289        };
1290        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1291
1292        // After the server has finished running, we can check to see that close was called.
1293        assert!(*close_called.lock());
1294    }
1295
1296    #[fuchsia::test]
1297    async fn test_block_flush_is_called() {
1298        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1299
1300        struct Interface {
1301            flush_called: Arc<AtomicBool>,
1302        }
1303        impl block_server::async_interface::Interface for Interface {
1304            fn get_info(&self) -> Cow<'_, DeviceInfo> {
1305                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1306                    device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1307                    max_transfer_blocks: None,
1308                    block_range: Some(0..1000),
1309                    type_guid: [0; 16],
1310                    instance_guid: [0; 16],
1311                    name: "foo".to_string(),
1312                    flags: 0,
1313                }))
1314            }
1315
1316            async fn read(
1317                &self,
1318                _device_block_offset: u64,
1319                _block_count: u32,
1320                _vmo: &Arc<zx::Vmo>,
1321                _vmo_offset: u64,
1322                _opts: ReadOptions,
1323                _trace_flow_id: Option<NonZero<u64>>,
1324            ) -> Result<(), zx::Status> {
1325                unreachable!();
1326            }
1327
1328            async fn write(
1329                &self,
1330                _device_block_offset: u64,
1331                _block_count: u32,
1332                _vmo: &Arc<zx::Vmo>,
1333                _vmo_offset: u64,
1334                _opts: WriteOptions,
1335                _trace_flow_id: Option<NonZero<u64>>,
1336            ) -> Result<(), zx::Status> {
1337                unreachable!();
1338            }
1339
1340            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1341                self.flush_called.store(true, Ordering::Relaxed);
1342                Ok(())
1343            }
1344
1345            async fn trim(
1346                &self,
1347                _device_block_offset: u64,
1348                _block_count: u32,
1349                _trace_flow_id: Option<NonZero<u64>>,
1350            ) -> Result<(), zx::Status> {
1351                unreachable!();
1352            }
1353        }
1354
1355        let flush_called = Arc::new(AtomicBool::new(false));
1356
1357        futures::join!(
1358            async {
1359                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1360
1361                block_client.flush().await.expect("flush failed");
1362            },
1363            async {
1364                let block_server = BlockServer::new(
1365                    512,
1366                    Arc::new(Interface { flush_called: flush_called.clone() }),
1367                );
1368                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1369            }
1370        );
1371
1372        assert!(flush_called.load(Ordering::Relaxed));
1373    }
1374
1375    #[fuchsia::test]
1376    async fn test_trace_flow_ids_set() {
1377        let (proxy, server) = fidl::endpoints::create_proxy();
1378
1379        futures::join!(
1380            async {
1381                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1382                block_client.flush().await.expect("flush failed");
1383            },
1384            async {
1385                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1386                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1387                    if request.trace_flow_id > 0 {
1388                        *flow_id.lock() = Some(request.trace_flow_id);
1389                    }
1390                    BlockFifoResponse {
1391                        status: zx::Status::OK.into_raw(),
1392                        reqid: request.reqid,
1393                        ..Default::default()
1394                    }
1395                };
1396                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1397                // After the server has finished running, verify the trace flow ID was set to some value.
1398                assert!(flow_id.lock().is_some());
1399            }
1400        );
1401    }
1402}