Skip to main content

block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl_fuchsia_storage_block as block;
13use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
14use fuchsia_async as fasync;
15use fuchsia_sync::Mutex;
16use futures::channel::oneshot;
17use futures::executor::block_on;
18use std::borrow::Borrow;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use storage_trace as trace;
30use zx::sys::zx_handle_t;
31use zx::{self as zx, HandleBased as _};
32
33pub use cache::Cache;
34
35pub use block::DeviceFlag as BlockDeviceFlag;
36
37pub use block_protocol::*;
38
39pub mod cache;
40
41const TEMP_VMO_SIZE: usize = 65536;
42
43/// If a trace flow ID isn't specified for requests, one will be generated.
44pub const NO_TRACE_ID: u64 = 0;
45
46pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
47
48fn fidl_to_status(error: fidl::Error) -> zx::Status {
49    match error {
50        fidl::Error::ClientChannelClosed { status, .. } => status,
51        _ => zx::Status::INTERNAL,
52    }
53}
54
55fn opcode_str(opcode: u8) -> &'static str {
56    match BlockOpcode::from_primitive(opcode) {
57        Some(BlockOpcode::Read) => "read",
58        Some(BlockOpcode::Write) => "write",
59        Some(BlockOpcode::Flush) => "flush",
60        Some(BlockOpcode::Trim) => "trim",
61        Some(BlockOpcode::CloseVmo) => "close_vmo",
62        None => "unknown",
63    }
64}
65
66// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
67// reused within this process).
68fn generate_trace_flow_id(request_id: u32) -> u64 {
69    static SELF_HANDLE: LazyLock<zx_handle_t> =
70        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
71    *SELF_HANDLE as u64 + (request_id as u64) << 32
72}
73
74pub enum BufferSlice<'a> {
75    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
76    Memory(&'a [u8]),
77}
78
79impl<'a> BufferSlice<'a> {
80    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
81        BufferSlice::VmoId { vmo_id, offset, length }
82    }
83}
84
85impl<'a> From<&'a [u8]> for BufferSlice<'a> {
86    fn from(buf: &'a [u8]) -> Self {
87        BufferSlice::Memory(buf)
88    }
89}
90
91pub enum MutableBufferSlice<'a> {
92    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
93    Memory(&'a mut [u8]),
94}
95
96impl<'a> MutableBufferSlice<'a> {
97    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
98        MutableBufferSlice::VmoId { vmo_id, offset, length }
99    }
100}
101
102impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
103    fn from(buf: &'a mut [u8]) -> Self {
104        MutableBufferSlice::Memory(buf)
105    }
106}
107
108#[derive(Default)]
109struct RequestState {
110    result: Option<zx::Status>,
111    waker: Option<Waker>,
112}
113
114#[derive(Default)]
115struct FifoState {
116    // The fifo.
117    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
118
119    // The next request ID to be used.
120    next_request_id: u32,
121
122    // A queue of messages to be sent on the fifo.
123    queue: std::collections::VecDeque<BlockFifoRequest>,
124
125    // Map from request ID to RequestState.
126    map: HashMap<u32, RequestState>,
127
128    // The waker for the FifoPoller.
129    poller_waker: Option<Waker>,
130
131    // If set, attach a barrier to the next write request
132    attach_barrier: bool,
133}
134
135impl FifoState {
136    fn terminate(&mut self) {
137        self.fifo.take();
138        for (_, request_state) in self.map.iter_mut() {
139            request_state.result.get_or_insert(zx::Status::CANCELED);
140            if let Some(waker) = request_state.waker.take() {
141                waker.wake();
142            }
143        }
144        if let Some(waker) = self.poller_waker.take() {
145            waker.wake();
146        }
147    }
148
149    // Returns true if polling should be terminated.
150    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
151        let fifo = if let Some(fifo) = self.fifo.as_ref() {
152            fifo
153        } else {
154            return true;
155        };
156
157        loop {
158            let slice = self.queue.as_slices().0;
159            if slice.is_empty() {
160                return false;
161            }
162            match fifo.try_write(context, slice) {
163                Poll::Ready(Ok(sent)) => {
164                    self.queue.drain(0..sent);
165                }
166                Poll::Ready(Err(_)) => {
167                    self.terminate();
168                    return true;
169                }
170                Poll::Pending => {
171                    return false;
172                }
173            }
174        }
175    }
176}
177
178type FifoStateRef = Arc<Mutex<FifoState>>;
179
180// A future used for fifo responses.
181struct ResponseFuture {
182    request_id: u32,
183    fifo_state: FifoStateRef,
184}
185
186impl ResponseFuture {
187    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
188        ResponseFuture { request_id, fifo_state }
189    }
190}
191
192impl Future for ResponseFuture {
193    type Output = Result<(), zx::Status>;
194
195    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
196        let mut state = self.fifo_state.lock();
197        let request_state = state.map.get_mut(&self.request_id).unwrap();
198        if let Some(result) = request_state.result {
199            Poll::Ready(result.into())
200        } else {
201            request_state.waker.replace(context.waker().clone());
202            Poll::Pending
203        }
204    }
205}
206
207impl Drop for ResponseFuture {
208    fn drop(&mut self) {
209        let mut state = self.fifo_state.lock();
210        state.map.remove(&self.request_id).unwrap();
211        update_outstanding_requests_counter(state.map.len());
212    }
213}
214
215/// Wraps a vmo-id. Will panic if you forget to detach.
216#[derive(Debug)]
217#[must_use]
218pub struct VmoId(AtomicU16);
219
220impl VmoId {
221    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
222    pub fn new(id: u16) -> Self {
223        Self(AtomicU16::new(id))
224    }
225
226    /// Invalidates self and returns a new VmoId with the same underlying ID.
227    pub fn take(&self) -> Self {
228        Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
229    }
230
231    pub fn is_valid(&self) -> bool {
232        self.id() != VMOID_INVALID
233    }
234
235    /// Takes the ID.  The caller assumes responsibility for detaching.
236    #[must_use]
237    pub fn into_id(self) -> u16 {
238        self.0.swap(VMOID_INVALID, Ordering::Relaxed)
239    }
240
241    pub fn id(&self) -> u16 {
242        self.0.load(Ordering::Relaxed)
243    }
244}
245
246impl PartialEq for VmoId {
247    fn eq(&self, other: &Self) -> bool {
248        self.id() == other.id()
249    }
250}
251
252impl Eq for VmoId {}
253
254impl Drop for VmoId {
255    fn drop(&mut self) {
256        assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
257    }
258}
259
260impl Hash for VmoId {
261    fn hash<H: Hasher>(&self, state: &mut H) {
262        self.id().hash(state);
263    }
264}
265
266/// Represents a client connection to a block device. This is a simplified version of the block.fidl
267/// interface.
268/// Most users will use the RemoteBlockClient instantiation of this trait.
269pub trait BlockClient: Send + Sync {
270    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
271    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
272
273    /// Detaches the given vmo-id from the device.
274    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
275
276    /// Reads from the device at |device_offset| into the given buffer slice.
277    fn read_at(
278        &self,
279        buffer_slice: MutableBufferSlice<'_>,
280        device_offset: u64,
281    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
282        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
283    }
284
285    fn read_at_with_opts(
286        &self,
287        buffer_slice: MutableBufferSlice<'_>,
288        device_offset: u64,
289        opts: ReadOptions,
290    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
291        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
292    }
293
294    fn read_at_with_opts_traced(
295        &self,
296        buffer_slice: MutableBufferSlice<'_>,
297        device_offset: u64,
298        opts: ReadOptions,
299        trace_flow_id: u64,
300    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
301
302    /// Writes the data in |buffer_slice| to the device.
303    fn write_at(
304        &self,
305        buffer_slice: BufferSlice<'_>,
306        device_offset: u64,
307    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
308        self.write_at_with_opts_traced(
309            buffer_slice,
310            device_offset,
311            WriteOptions::default(),
312            NO_TRACE_ID,
313        )
314    }
315
316    fn write_at_with_opts(
317        &self,
318        buffer_slice: BufferSlice<'_>,
319        device_offset: u64,
320        opts: WriteOptions,
321    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
322        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
323    }
324
325    fn write_at_with_opts_traced(
326        &self,
327        buffer_slice: BufferSlice<'_>,
328        device_offset: u64,
329        opts: WriteOptions,
330        trace_flow_id: u64,
331    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
332
333    /// Trims the given range on the block device.
334    fn trim(
335        &self,
336        device_range: Range<u64>,
337    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
338        self.trim_traced(device_range, NO_TRACE_ID)
339    }
340
341    fn trim_traced(
342        &self,
343        device_range: Range<u64>,
344        trace_flow_id: u64,
345    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
346
347    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
348    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
349    /// This method makes it easier to guarantee that the barrier is attached to the correct
350    /// write operation when subsequent write operations can get reordered.
351    fn barrier(&self);
352
353    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
354        self.flush_traced(NO_TRACE_ID)
355    }
356
357    /// Sends a flush request to the underlying block device.
358    fn flush_traced(
359        &self,
360        trace_flow_id: u64,
361    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
362
363    /// Closes the fifo.
364    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
365
366    /// Returns the block size of the device.
367    fn block_size(&self) -> u32;
368
369    /// Returns the size, in blocks, of the device.
370    fn block_count(&self) -> u64;
371
372    /// Returns the maximum number of blocks which can be transferred in a single request.
373    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
374
375    /// Returns the block flags reported by the device.
376    fn block_flags(&self) -> BlockDeviceFlag;
377
378    /// Returns true if the remote fifo is still connected.
379    fn is_connected(&self) -> bool;
380}
381
382struct Common {
383    block_size: u32,
384    block_count: u64,
385    max_transfer_blocks: Option<NonZero<u32>>,
386    block_flags: BlockDeviceFlag,
387    fifo_state: FifoStateRef,
388    temp_vmo: futures::lock::Mutex<zx::Vmo>,
389    temp_vmo_id: VmoId,
390}
391
392impl Common {
393    fn new(
394        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
395        info: &block::BlockInfo,
396        temp_vmo: zx::Vmo,
397        temp_vmo_id: VmoId,
398    ) -> Self {
399        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
400        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
401        Self {
402            block_size: info.block_size,
403            block_count: info.block_count,
404            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
405                NonZero::new(info.max_transfer_size / info.block_size)
406            } else {
407                None
408            },
409            block_flags: info.flags,
410            fifo_state,
411            temp_vmo: futures::lock::Mutex::new(temp_vmo),
412            temp_vmo_id,
413        }
414    }
415
416    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
417        if bytes % self.block_size as u64 != 0 {
418            Err(zx::Status::INVALID_ARGS)
419        } else {
420            Ok(bytes / self.block_size as u64)
421        }
422    }
423
424    // Sends the request and waits for the response.
425    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
426        let (request_id, trace_flow_id) = {
427            let mut state = self.fifo_state.lock();
428
429            let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
430            if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
431                && state.attach_barrier
432            {
433                flags |= BlockIoFlag::PRE_BARRIER;
434                request.command.flags = flags.bits();
435                state.attach_barrier = false;
436            }
437
438            if state.fifo.is_none() {
439                // Fifo has been closed.
440                return Err(zx::Status::CANCELED);
441            }
442            trace::duration!(
443                "storage",
444                "block_client::send::start",
445                "op" => opcode_str(request.command.opcode),
446                "len" => request.length * self.block_size
447            );
448            let request_id = state.next_request_id;
449            state.next_request_id = state.next_request_id.overflowing_add(1).0;
450            assert!(
451                state.map.insert(request_id, RequestState::default()).is_none(),
452                "request id in use!"
453            );
454            update_outstanding_requests_counter(state.map.len());
455            request.reqid = request_id;
456            if request.trace_flow_id == NO_TRACE_ID {
457                request.trace_flow_id = generate_trace_flow_id(request_id);
458            }
459            let trace_flow_id = request.trace_flow_id;
460            trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
461            state.queue.push_back(request);
462            if let Some(waker) = state.poller_waker.clone() {
463                state.poll_send_requests(&mut Context::from_waker(&waker));
464            }
465            (request_id, trace_flow_id)
466        };
467        ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
468        trace::duration!("storage", "block_client::send::end");
469        trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
470        Ok(())
471    }
472
473    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
474        self.send(BlockFifoRequest {
475            command: BlockFifoCommand {
476                opcode: BlockOpcode::CloseVmo.into_primitive(),
477                flags: 0,
478                ..Default::default()
479            },
480            vmoid: vmo_id.into_id(),
481            ..Default::default()
482        })
483        .await
484    }
485
486    async fn read_at(
487        &self,
488        buffer_slice: MutableBufferSlice<'_>,
489        device_offset: u64,
490        opts: ReadOptions,
491        trace_flow_id: u64,
492    ) -> Result<(), zx::Status> {
493        let mut flags = BlockIoFlag::empty();
494
495        if opts.inline_crypto.is_enabled {
496            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
497        }
498
499        match buffer_slice {
500            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
501                self.send(BlockFifoRequest {
502                    command: BlockFifoCommand {
503                        opcode: BlockOpcode::Read.into_primitive(),
504                        flags: flags.bits(),
505                        ..Default::default()
506                    },
507                    vmoid: vmo_id.id(),
508                    length: self
509                        .to_blocks(length)?
510                        .try_into()
511                        .map_err(|_| zx::Status::INVALID_ARGS)?,
512                    vmo_offset: self.to_blocks(offset)?,
513                    dev_offset: self.to_blocks(device_offset)?,
514                    trace_flow_id,
515                    dun: opts.inline_crypto.dun,
516                    slot: opts.inline_crypto.slot,
517                    ..Default::default()
518                })
519                .await?
520            }
521            MutableBufferSlice::Memory(mut slice) => {
522                let temp_vmo = self.temp_vmo.lock().await;
523                let mut device_block = self.to_blocks(device_offset)?;
524                loop {
525                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
526                    let block_count = self.to_blocks(to_do as u64)? as u32;
527                    self.send(BlockFifoRequest {
528                        command: BlockFifoCommand {
529                            opcode: BlockOpcode::Read.into_primitive(),
530                            flags: flags.bits(),
531                            ..Default::default()
532                        },
533                        vmoid: self.temp_vmo_id.id(),
534                        length: block_count,
535                        vmo_offset: 0,
536                        dev_offset: device_block,
537                        trace_flow_id,
538                        dun: opts.inline_crypto.dun,
539                        slot: opts.inline_crypto.slot,
540                        ..Default::default()
541                    })
542                    .await?;
543                    temp_vmo.read(&mut slice[..to_do], 0)?;
544                    if to_do == slice.len() {
545                        break;
546                    }
547                    device_block += block_count as u64;
548                    slice = &mut slice[to_do..];
549                }
550            }
551        }
552        Ok(())
553    }
554
555    async fn write_at(
556        &self,
557        buffer_slice: BufferSlice<'_>,
558        device_offset: u64,
559        opts: WriteOptions,
560        trace_flow_id: u64,
561    ) -> Result<(), zx::Status> {
562        let mut flags = BlockIoFlag::empty();
563
564        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
565            flags |= BlockIoFlag::FORCE_ACCESS;
566        }
567
568        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
569            flags |= BlockIoFlag::PRE_BARRIER;
570        }
571
572        if opts.inline_crypto.is_enabled {
573            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
574        }
575
576        match buffer_slice {
577            BufferSlice::VmoId { vmo_id, offset, length } => {
578                self.send(BlockFifoRequest {
579                    command: BlockFifoCommand {
580                        opcode: BlockOpcode::Write.into_primitive(),
581                        flags: flags.bits(),
582                        ..Default::default()
583                    },
584                    vmoid: vmo_id.id(),
585                    length: self
586                        .to_blocks(length)?
587                        .try_into()
588                        .map_err(|_| zx::Status::INVALID_ARGS)?,
589                    vmo_offset: self.to_blocks(offset)?,
590                    dev_offset: self.to_blocks(device_offset)?,
591                    trace_flow_id,
592                    dun: opts.inline_crypto.dun,
593                    slot: opts.inline_crypto.slot,
594                    ..Default::default()
595                })
596                .await?;
597            }
598            BufferSlice::Memory(mut slice) => {
599                let temp_vmo = self.temp_vmo.lock().await;
600                let mut device_block = self.to_blocks(device_offset)?;
601                loop {
602                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
603                    let block_count = self.to_blocks(to_do as u64)? as u32;
604                    temp_vmo.write(&slice[..to_do], 0)?;
605                    self.send(BlockFifoRequest {
606                        command: BlockFifoCommand {
607                            opcode: BlockOpcode::Write.into_primitive(),
608                            flags: flags.bits(),
609                            ..Default::default()
610                        },
611                        vmoid: self.temp_vmo_id.id(),
612                        length: block_count,
613                        vmo_offset: 0,
614                        dev_offset: device_block,
615                        trace_flow_id,
616                        dun: opts.inline_crypto.dun,
617                        slot: opts.inline_crypto.slot,
618                        ..Default::default()
619                    })
620                    .await?;
621                    if to_do == slice.len() {
622                        break;
623                    }
624                    device_block += block_count as u64;
625                    slice = &slice[to_do..];
626                }
627            }
628        }
629        Ok(())
630    }
631
632    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
633        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
634        let dev_offset = self.to_blocks(device_range.start)?;
635        self.send(BlockFifoRequest {
636            command: BlockFifoCommand {
637                opcode: BlockOpcode::Trim.into_primitive(),
638                flags: 0,
639                ..Default::default()
640            },
641            vmoid: VMOID_INVALID,
642            length,
643            dev_offset,
644            trace_flow_id,
645            ..Default::default()
646        })
647        .await
648    }
649
650    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
651        self.send(BlockFifoRequest {
652            command: BlockFifoCommand {
653                opcode: BlockOpcode::Flush.into_primitive(),
654                flags: 0,
655                ..Default::default()
656            },
657            vmoid: VMOID_INVALID,
658            trace_flow_id,
659            ..Default::default()
660        })
661        .await
662    }
663
664    fn barrier(&self) {
665        self.fifo_state.lock().attach_barrier = true;
666    }
667
668    fn block_size(&self) -> u32 {
669        self.block_size
670    }
671
672    fn block_count(&self) -> u64 {
673        self.block_count
674    }
675
676    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
677        self.max_transfer_blocks.clone()
678    }
679
680    fn block_flags(&self) -> BlockDeviceFlag {
681        self.block_flags
682    }
683
684    fn is_connected(&self) -> bool {
685        self.fifo_state.lock().fifo.is_some()
686    }
687}
688
689impl Drop for Common {
690    fn drop(&mut self) {
691        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
692        // down.
693        let _ = self.temp_vmo_id.take().into_id();
694        self.fifo_state.lock().terminate();
695    }
696}
697
698// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
699pub struct RemoteBlockClient {
700    session: block::SessionProxy,
701    common: Common,
702}
703
704impl RemoteBlockClient {
705    /// Returns a connection to a remote block device via the given channel.
706    pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
707        let remote = remote.borrow();
708        let info =
709            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
710        let (session, server) = fidl::endpoints::create_proxy();
711        let () = remote.open_session(server).map_err(fidl_to_status)?;
712        Self::from_session(info, session).await
713    }
714
715    pub async fn from_session(
716        info: block::BlockInfo,
717        session: block::SessionProxy,
718    ) -> Result<Self, zx::Status> {
719        const SCRATCH_VMO_NAME: zx::Name = zx::Name::new_lossy("block-client-scratch-vmo");
720        let fifo =
721            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
722        let fifo = fasync::Fifo::from_fifo(fifo);
723        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
724        temp_vmo.set_name(&SCRATCH_VMO_NAME)?;
725        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
726        let vmo_id =
727            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
728        let vmo_id = VmoId::new(vmo_id.id);
729        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
730    }
731}
732
733impl BlockClient for RemoteBlockClient {
734    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
735        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
736        let vmo_id = self
737            .session
738            .attach_vmo(dup)
739            .await
740            .map_err(fidl_to_status)?
741            .map_err(zx::Status::from_raw)?;
742        Ok(VmoId::new(vmo_id.id))
743    }
744
745    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
746        self.common.detach_vmo(vmo_id).await
747    }
748
749    async fn read_at_with_opts_traced(
750        &self,
751        buffer_slice: MutableBufferSlice<'_>,
752        device_offset: u64,
753        opts: ReadOptions,
754        trace_flow_id: u64,
755    ) -> Result<(), zx::Status> {
756        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
757    }
758
759    async fn write_at_with_opts_traced(
760        &self,
761        buffer_slice: BufferSlice<'_>,
762        device_offset: u64,
763        opts: WriteOptions,
764        trace_flow_id: u64,
765    ) -> Result<(), zx::Status> {
766        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
767    }
768
769    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
770        self.common.trim(range, trace_flow_id).await
771    }
772
773    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
774        self.common.flush(trace_flow_id).await
775    }
776
777    fn barrier(&self) {
778        self.common.barrier()
779    }
780
781    async fn close(&self) -> Result<(), zx::Status> {
782        let () =
783            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
784        Ok(())
785    }
786
787    fn block_size(&self) -> u32 {
788        self.common.block_size()
789    }
790
791    fn block_count(&self) -> u64 {
792        self.common.block_count()
793    }
794
795    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
796        self.common.max_transfer_blocks()
797    }
798
799    fn block_flags(&self) -> BlockDeviceFlag {
800        self.common.block_flags()
801    }
802
803    fn is_connected(&self) -> bool {
804        self.common.is_connected()
805    }
806}
807
808pub struct RemoteBlockClientSync {
809    session: block::SessionSynchronousProxy,
810    common: Common,
811}
812
813impl RemoteBlockClientSync {
814    /// Returns a connection to a remote block device via the given channel, but spawns a separate
815    /// thread for polling the fifo which makes it work in cases where no executor is configured for
816    /// the calling thread.
817    pub fn new(
818        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
819    ) -> Result<Self, zx::Status> {
820        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
821        let info = remote
822            .get_info(zx::MonotonicInstant::INFINITE)
823            .map_err(fidl_to_status)?
824            .map_err(zx::Status::from_raw)?;
825        let (client, server) = fidl::endpoints::create_endpoints();
826        let () = remote.open_session(server).map_err(fidl_to_status)?;
827        let session = block::SessionSynchronousProxy::new(client.into_channel());
828        let fifo = session
829            .get_fifo(zx::MonotonicInstant::INFINITE)
830            .map_err(fidl_to_status)?
831            .map_err(zx::Status::from_raw)?;
832        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
833        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
834        let vmo_id = session
835            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
836            .map_err(fidl_to_status)?
837            .map_err(zx::Status::from_raw)?;
838        let vmo_id = VmoId::new(vmo_id.id);
839
840        // The fifo needs to be instantiated from the thread that has the executor as that's where
841        // the fifo registers for notifications to be delivered.
842        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
843        std::thread::spawn(move || {
844            let mut executor = fasync::LocalExecutor::default();
845            let fifo = fasync::Fifo::from_fifo(fifo);
846            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
847            let fifo_state = common.fifo_state.clone();
848            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
849            executor.run_singlethreaded(FifoPoller { fifo_state });
850        });
851        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
852    }
853
854    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
855        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
856        let vmo_id = self
857            .session
858            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
859            .map_err(fidl_to_status)?
860            .map_err(zx::Status::from_raw)?;
861        Ok(VmoId::new(vmo_id.id))
862    }
863
864    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
865        block_on(self.common.detach_vmo(vmo_id))
866    }
867
868    pub fn read_at(
869        &self,
870        buffer_slice: MutableBufferSlice<'_>,
871        device_offset: u64,
872    ) -> Result<(), zx::Status> {
873        block_on(self.common.read_at(
874            buffer_slice,
875            device_offset,
876            ReadOptions::default(),
877            NO_TRACE_ID,
878        ))
879    }
880
881    pub fn write_at(
882        &self,
883        buffer_slice: BufferSlice<'_>,
884        device_offset: u64,
885    ) -> Result<(), zx::Status> {
886        block_on(self.common.write_at(
887            buffer_slice,
888            device_offset,
889            WriteOptions::default(),
890            NO_TRACE_ID,
891        ))
892    }
893
894    pub fn flush(&self) -> Result<(), zx::Status> {
895        block_on(self.common.flush(NO_TRACE_ID))
896    }
897
898    pub fn close(&self) -> Result<(), zx::Status> {
899        let () = self
900            .session
901            .close(zx::MonotonicInstant::INFINITE)
902            .map_err(fidl_to_status)?
903            .map_err(zx::Status::from_raw)?;
904        Ok(())
905    }
906
907    pub fn block_size(&self) -> u32 {
908        self.common.block_size()
909    }
910
911    pub fn block_count(&self) -> u64 {
912        self.common.block_count()
913    }
914
915    pub fn is_connected(&self) -> bool {
916        self.common.is_connected()
917    }
918}
919
920impl Drop for RemoteBlockClientSync {
921    fn drop(&mut self) {
922        // Ignore errors here as there is not much we can do about it.
923        let _ = self.close();
924    }
925}
926
927// FifoPoller is a future responsible for sending and receiving from the fifo.
928struct FifoPoller {
929    fifo_state: FifoStateRef,
930}
931
932impl Future for FifoPoller {
933    type Output = ();
934
935    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
936        let mut state_lock = self.fifo_state.lock();
937        let state = state_lock.deref_mut(); // So that we can split the borrow.
938
939        // Send requests.
940        if state.poll_send_requests(context) {
941            return Poll::Ready(());
942        }
943
944        // Receive responses.
945        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
946        loop {
947            let mut response = MaybeUninit::uninit();
948            match fifo.try_read(context, &mut response) {
949                Poll::Pending => {
950                    state.poller_waker = Some(context.waker().clone());
951                    return Poll::Pending;
952                }
953                Poll::Ready(Ok(_)) => {
954                    let response = unsafe { response.assume_init() };
955                    let request_id = response.reqid;
956                    // If the request isn't in the map, assume that it's a cancelled read.
957                    if let Some(request_state) = state.map.get_mut(&request_id) {
958                        request_state.result.replace(zx::Status::from_raw(response.status));
959                        if let Some(waker) = request_state.waker.take() {
960                            waker.wake();
961                        }
962                    }
963                }
964                Poll::Ready(Err(_)) => {
965                    state.terminate();
966                    return Poll::Ready(());
967                }
968            }
969        }
970    }
971}
972
973fn update_outstanding_requests_counter(outstanding: usize) {
974    trace::counter!("storage", "block-requests", 0, "outstanding" => outstanding);
975}
976
977#[cfg(test)]
978mod tests {
979    use super::{
980        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
981        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
982    };
983    use block_protocol::ReadOptions;
984    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
985    use fidl::endpoints::RequestStream as _;
986    use fidl_fuchsia_storage_block as block;
987    use fuchsia_async as fasync;
988    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
989    use futures::join;
990    use futures::stream::StreamExt as _;
991    use futures::stream::futures_unordered::FuturesUnordered;
992    use ramdevice_client::RamdiskClient;
993    use std::borrow::Cow;
994    use std::num::NonZero;
995    use std::sync::Arc;
996    use std::sync::atomic::{AtomicBool, Ordering};
997
998    const RAMDISK_BLOCK_SIZE: u64 = 1024;
999    const RAMDISK_BLOCK_COUNT: u64 = 1024;
1000
1001    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1002        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1003            .await
1004            .expect("RamdiskClient::create failed");
1005        let client_end = ramdisk.open().expect("ramdisk.open failed");
1006        let proxy = client_end.into_proxy();
1007        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1008        assert_eq!(block_client.block_size(), 1024);
1009        let client_end = ramdisk.open().expect("ramdisk.open failed");
1010        let proxy = client_end.into_proxy();
1011        (ramdisk, proxy, block_client)
1012    }
1013
1014    #[fuchsia::test]
1015    async fn test_against_ram_disk() {
1016        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1017
1018        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1019        vmo.write(b"hello", 5).expect("vmo.write failed");
1020        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1021        block_client
1022            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1023            .await
1024            .expect("write_at failed");
1025        block_client
1026            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1027            .await
1028            .expect("read_at failed");
1029        let mut buf: [u8; 5] = Default::default();
1030        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1031        assert_eq!(&buf, b"hello");
1032        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1033    }
1034
1035    #[fuchsia::test]
1036    async fn test_alignment() {
1037        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1038        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1039        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1040        block_client
1041            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1042            .await
1043            .expect_err("expected failure due to bad alignment");
1044        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1045    }
1046
1047    #[fuchsia::test]
1048    async fn test_parallel_io() {
1049        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1050        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1051        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1052        let mut reads = Vec::new();
1053        for _ in 0..1024 {
1054            reads.push(
1055                block_client
1056                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1057                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1058            );
1059        }
1060        futures::future::join_all(reads).await;
1061        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1062    }
1063
1064    #[fuchsia::test]
1065    async fn test_closed_device() {
1066        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1067        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1068        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1069        let mut reads = Vec::new();
1070        for _ in 0..1024 {
1071            reads.push(
1072                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1073            );
1074        }
1075        assert!(block_client.is_connected());
1076        let _ = futures::join!(futures::future::join_all(reads), async {
1077            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1078        });
1079        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1080        while block_client
1081            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1082            .await
1083            .is_ok()
1084        {}
1085
1086        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1087        // get false-positives from is_connected.
1088        while block_client.is_connected() {
1089            // Sleep for a bit to minimise lock contention.
1090            fasync::Timer::new(fasync::MonotonicInstant::after(
1091                zx::MonotonicDuration::from_millis(500),
1092            ))
1093            .await;
1094        }
1095
1096        // But once is_connected goes negative, it should stay negative.
1097        assert_eq!(block_client.is_connected(), false);
1098        let _ = block_client.detach_vmo(vmo_id).await;
1099    }
1100
1101    #[fuchsia::test]
1102    async fn test_cancelled_reads() {
1103        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1104        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1105        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1106        {
1107            let mut reads = FuturesUnordered::new();
1108            for _ in 0..1024 {
1109                reads.push(
1110                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1111                );
1112            }
1113            // Read the first 500 results and then dump the rest.
1114            for _ in 0..500 {
1115                reads.next().await;
1116            }
1117        }
1118        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1119    }
1120
1121    #[fuchsia::test]
1122    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1123        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1124        let block_client_ref = &block_client;
1125        let test_one = |offset, len, fill| async move {
1126            let buf = vec![fill; len];
1127            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1128            // Read back an extra block either side.
1129            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1130            block_client_ref
1131                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1132                .await
1133                .expect("read_at failed");
1134            assert_eq!(
1135                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1136                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1137            );
1138            assert_eq!(
1139                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1140                &buf[..]
1141            );
1142            assert_eq!(
1143                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1144                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1145            );
1146        };
1147        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1148        join!(
1149            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1150            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1151        );
1152    }
1153
1154    // Implements dummy server which can be used by test cases to verify whether
1155    // channel messages and fifo operations are being received - by using set_channel_handler or
1156    // set_fifo_hander respectively
1157    struct FakeBlockServer<'a> {
1158        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1159        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1160        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1161    }
1162
1163    impl<'a> FakeBlockServer<'a> {
1164        // Creates a new FakeBlockServer given a channel to listen on.
1165        //
1166        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1167        // handles requests received from channel or the fifo respectfully.
1168        //
1169        // 'channel_handler' receives a message before it is handled by the default implementation
1170        // and can return 'true' to indicate all processing is done and no further processing of
1171        // that message is required
1172        //
1173        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1174        // FakeBlockServer will send over the fifo.
1175        fn new(
1176            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1177            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1178            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1179        ) -> FakeBlockServer<'a> {
1180            FakeBlockServer {
1181                server_channel: Some(server_channel),
1182                channel_handler: Box::new(channel_handler),
1183                fifo_handler: Box::new(fifo_handler),
1184            }
1185        }
1186
1187        // Runs the server
1188        async fn run(&mut self) {
1189            let server = self.server_channel.take().unwrap();
1190
1191            // Set up a mock server.
1192            let (server_fifo, client_fifo) =
1193                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1194                    .expect("Fifo::create failed");
1195            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1196
1197            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1198            let fifo_future = Abortable::new(
1199                async {
1200                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1201                    let (mut reader, mut writer) = fifo.async_io();
1202                    let mut request = BlockFifoRequest::default();
1203                    loop {
1204                        match reader.read_entries(&mut request).await {
1205                            Ok(1) => {}
1206                            Err(zx::Status::PEER_CLOSED) => break,
1207                            Err(e) => panic!("read_entry failed {:?}", e),
1208                            _ => unreachable!(),
1209                        };
1210
1211                        let response = self.fifo_handler.as_ref()(request);
1212                        writer
1213                            .write_entries(std::slice::from_ref(&response))
1214                            .await
1215                            .expect("write_entries failed");
1216                    }
1217                },
1218                fifo_future_abort_registration,
1219            );
1220
1221            let channel_future = async {
1222                server
1223                    .into_stream()
1224                    .for_each_concurrent(None, |request| async {
1225                        let request = request.expect("unexpected fidl error");
1226
1227                        match request {
1228                            block::BlockRequest::GetInfo { responder } => {
1229                                responder
1230                                    .send(Ok(&block::BlockInfo {
1231                                        block_count: 1024,
1232                                        block_size: 512,
1233                                        max_transfer_size: 1024 * 1024,
1234                                        flags: block::DeviceFlag::empty(),
1235                                    }))
1236                                    .expect("send failed");
1237                            }
1238                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1239                                let stream = session.into_stream();
1240                                stream
1241                                    .for_each(|request| async {
1242                                        let request = request.expect("unexpected fidl error");
1243                                        // Give a chance for the test to register and potentially
1244                                        // handle the event
1245                                        if self.channel_handler.as_ref()(&request) {
1246                                            return;
1247                                        }
1248                                        match request {
1249                                            block::SessionRequest::GetFifo { responder } => {
1250                                                match maybe_server_fifo.lock().take() {
1251                                                    Some(fifo) => {
1252                                                        responder.send(Ok(fifo.downcast()))
1253                                                    }
1254                                                    None => responder.send(Err(
1255                                                        zx::Status::NO_RESOURCES.into_raw(),
1256                                                    )),
1257                                                }
1258                                                .expect("send failed")
1259                                            }
1260                                            block::SessionRequest::AttachVmo {
1261                                                vmo: _,
1262                                                responder,
1263                                            } => responder
1264                                                .send(Ok(&block::VmoId { id: 1 }))
1265                                                .expect("send failed"),
1266                                            block::SessionRequest::Close { responder } => {
1267                                                fifo_future_abort.abort();
1268                                                responder.send(Ok(())).expect("send failed")
1269                                            }
1270                                        }
1271                                    })
1272                                    .await
1273                            }
1274                            _ => panic!("Unexpected message"),
1275                        }
1276                    })
1277                    .await;
1278            };
1279
1280            let _result = join!(fifo_future, channel_future);
1281            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1282        }
1283    }
1284
1285    #[fuchsia::test]
1286    async fn test_block_close_is_called() {
1287        let close_called = fuchsia_sync::Mutex::new(false);
1288        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1289
1290        std::thread::spawn(move || {
1291            let _block_client =
1292                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1293            // The drop here should cause Close to be sent.
1294        });
1295
1296        let channel_handler = |request: &block::SessionRequest| -> bool {
1297            if let block::SessionRequest::Close { .. } = request {
1298                *close_called.lock() = true;
1299            }
1300            false
1301        };
1302        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1303
1304        // After the server has finished running, we can check to see that close was called.
1305        assert!(*close_called.lock());
1306    }
1307
1308    #[fuchsia::test]
1309    async fn test_block_flush_is_called() {
1310        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1311
1312        struct Interface {
1313            flush_called: Arc<AtomicBool>,
1314        }
1315        impl block_server::async_interface::Interface for Interface {
1316            fn get_info(&self) -> Cow<'_, DeviceInfo> {
1317                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1318                    device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1319                    max_transfer_blocks: None,
1320                    block_range: Some(0..1000),
1321                    type_guid: [0; 16],
1322                    instance_guid: [0; 16],
1323                    name: "foo".to_string(),
1324                    flags: 0,
1325                }))
1326            }
1327
1328            async fn read(
1329                &self,
1330                _device_block_offset: u64,
1331                _block_count: u32,
1332                _vmo: &Arc<zx::Vmo>,
1333                _vmo_offset: u64,
1334                _opts: ReadOptions,
1335                _trace_flow_id: Option<NonZero<u64>>,
1336            ) -> Result<(), zx::Status> {
1337                unreachable!();
1338            }
1339
1340            async fn write(
1341                &self,
1342                _device_block_offset: u64,
1343                _block_count: u32,
1344                _vmo: &Arc<zx::Vmo>,
1345                _vmo_offset: u64,
1346                _opts: WriteOptions,
1347                _trace_flow_id: Option<NonZero<u64>>,
1348            ) -> Result<(), zx::Status> {
1349                unreachable!();
1350            }
1351
1352            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1353                self.flush_called.store(true, Ordering::Relaxed);
1354                Ok(())
1355            }
1356
1357            async fn trim(
1358                &self,
1359                _device_block_offset: u64,
1360                _block_count: u32,
1361                _trace_flow_id: Option<NonZero<u64>>,
1362            ) -> Result<(), zx::Status> {
1363                unreachable!();
1364            }
1365        }
1366
1367        let flush_called = Arc::new(AtomicBool::new(false));
1368
1369        futures::join!(
1370            async {
1371                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1372
1373                block_client.flush().await.expect("flush failed");
1374            },
1375            async {
1376                let block_server = BlockServer::new(
1377                    512,
1378                    Arc::new(Interface { flush_called: flush_called.clone() }),
1379                );
1380                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1381            }
1382        );
1383
1384        assert!(flush_called.load(Ordering::Relaxed));
1385    }
1386
1387    #[fuchsia::test]
1388    async fn test_trace_flow_ids_set() {
1389        let (proxy, server) = fidl::endpoints::create_proxy();
1390
1391        futures::join!(
1392            async {
1393                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1394                block_client.flush().await.expect("flush failed");
1395            },
1396            async {
1397                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1398                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1399                    if request.trace_flow_id > 0 {
1400                        *flow_id.lock() = Some(request.trace_flow_id);
1401                    }
1402                    BlockFifoResponse {
1403                        status: zx::Status::OK.into_raw(),
1404                        reqid: request.reqid,
1405                        ..Default::default()
1406                    }
1407                };
1408                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1409                // After the server has finished running, verify the trace flow ID was set to some value.
1410                assert!(flow_id.lock().is_some());
1411            }
1412        );
1413    }
1414}