Skip to main content

block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl_fuchsia_storage_block as block;
13use fidl_fuchsia_storage_block::{MAX_TRANSFER_UNBOUNDED, VMOID_INVALID};
14use fuchsia_async as fasync;
15use fuchsia_sync::Mutex;
16use futures::channel::oneshot;
17use futures::executor::block_on;
18use std::borrow::Borrow;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use storage_trace as trace;
30use zx::sys::zx_handle_t;
31
32pub use cache::Cache;
33
34pub use block::DeviceFlag as BlockDeviceFlag;
35
36pub use block_protocol::*;
37
38pub mod cache;
39
40const TEMP_VMO_SIZE: usize = 65536;
41
42/// If a trace flow ID isn't specified for requests, one will be generated.
43pub const NO_TRACE_ID: u64 = 0;
44
45pub use fidl_fuchsia_storage_block::{BlockIoFlag, BlockOpcode};
46
47fn fidl_to_status(error: fidl::Error) -> zx::Status {
48    match error {
49        fidl::Error::ClientChannelClosed { status, .. } => status,
50        _ => zx::Status::INTERNAL,
51    }
52}
53
54fn opcode_str(opcode: u8) -> &'static str {
55    match BlockOpcode::from_primitive(opcode) {
56        Some(BlockOpcode::Read) => "read",
57        Some(BlockOpcode::Write) => "write",
58        Some(BlockOpcode::Flush) => "flush",
59        Some(BlockOpcode::Trim) => "trim",
60        Some(BlockOpcode::CloseVmo) => "close_vmo",
61        None => "unknown",
62    }
63}
64
65// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
66// reused within this process).
67fn generate_trace_flow_id(request_id: u32) -> u64 {
68    static SELF_HANDLE: LazyLock<zx_handle_t> =
69        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
70    *SELF_HANDLE as u64 + (request_id as u64) << 32
71}
72
73pub enum BufferSlice<'a> {
74    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
75    Memory(&'a [u8]),
76}
77
78impl<'a> BufferSlice<'a> {
79    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
80        BufferSlice::VmoId { vmo_id, offset, length }
81    }
82}
83
84impl<'a> From<&'a [u8]> for BufferSlice<'a> {
85    fn from(buf: &'a [u8]) -> Self {
86        BufferSlice::Memory(buf)
87    }
88}
89
90pub enum MutableBufferSlice<'a> {
91    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
92    Memory(&'a mut [u8]),
93}
94
95impl<'a> MutableBufferSlice<'a> {
96    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
97        MutableBufferSlice::VmoId { vmo_id, offset, length }
98    }
99}
100
101impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
102    fn from(buf: &'a mut [u8]) -> Self {
103        MutableBufferSlice::Memory(buf)
104    }
105}
106
107#[derive(Default)]
108struct RequestState {
109    result: Option<zx::Status>,
110    waker: Option<Waker>,
111}
112
113#[derive(Default)]
114struct FifoState {
115    // The fifo.
116    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
117
118    // The next request ID to be used.
119    next_request_id: u32,
120
121    // A queue of messages to be sent on the fifo.
122    queue: std::collections::VecDeque<BlockFifoRequest>,
123
124    // Map from request ID to RequestState.
125    map: HashMap<u32, RequestState>,
126
127    // The waker for the FifoPoller.
128    poller_waker: Option<Waker>,
129
130    // If set, attach a barrier to the next write request
131    attach_barrier: bool,
132}
133
134impl FifoState {
135    fn terminate(&mut self) {
136        self.fifo.take();
137        for (_, request_state) in self.map.iter_mut() {
138            request_state.result.get_or_insert(zx::Status::CANCELED);
139            if let Some(waker) = request_state.waker.take() {
140                waker.wake();
141            }
142        }
143        if let Some(waker) = self.poller_waker.take() {
144            waker.wake();
145        }
146    }
147
148    // Returns true if polling should be terminated.
149    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
150        let fifo = if let Some(fifo) = self.fifo.as_ref() {
151            fifo
152        } else {
153            return true;
154        };
155
156        loop {
157            let slice = self.queue.as_slices().0;
158            if slice.is_empty() {
159                return false;
160            }
161            match fifo.try_write(context, slice) {
162                Poll::Ready(Ok(sent)) => {
163                    self.queue.drain(0..sent);
164                }
165                Poll::Ready(Err(_)) => {
166                    self.terminate();
167                    return true;
168                }
169                Poll::Pending => {
170                    return false;
171                }
172            }
173        }
174    }
175}
176
177type FifoStateRef = Arc<Mutex<FifoState>>;
178
179// A future used for fifo responses.
180struct ResponseFuture {
181    request_id: u32,
182    fifo_state: FifoStateRef,
183}
184
185impl ResponseFuture {
186    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
187        ResponseFuture { request_id, fifo_state }
188    }
189}
190
191impl Future for ResponseFuture {
192    type Output = Result<(), zx::Status>;
193
194    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
195        let mut state = self.fifo_state.lock();
196        let request_state = state.map.get_mut(&self.request_id).unwrap();
197        if let Some(result) = request_state.result {
198            Poll::Ready(result.into())
199        } else {
200            request_state.waker.replace(context.waker().clone());
201            Poll::Pending
202        }
203    }
204}
205
206impl Drop for ResponseFuture {
207    fn drop(&mut self) {
208        let mut state = self.fifo_state.lock();
209        state.map.remove(&self.request_id).unwrap();
210        update_outstanding_requests_counter(state.map.len());
211    }
212}
213
214/// Wraps a vmo-id. Will panic if you forget to detach.
215#[derive(Debug)]
216#[must_use]
217pub struct VmoId(AtomicU16);
218
219impl VmoId {
220    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
221    pub fn new(id: u16) -> Self {
222        Self(AtomicU16::new(id))
223    }
224
225    /// Invalidates self and returns a new VmoId with the same underlying ID.
226    pub fn take(&self) -> Self {
227        Self(AtomicU16::new(self.0.swap(VMOID_INVALID, Ordering::Relaxed)))
228    }
229
230    pub fn is_valid(&self) -> bool {
231        self.id() != VMOID_INVALID
232    }
233
234    /// Takes the ID.  The caller assumes responsibility for detaching.
235    #[must_use]
236    pub fn into_id(self) -> u16 {
237        self.0.swap(VMOID_INVALID, Ordering::Relaxed)
238    }
239
240    pub fn id(&self) -> u16 {
241        self.0.load(Ordering::Relaxed)
242    }
243}
244
245impl PartialEq for VmoId {
246    fn eq(&self, other: &Self) -> bool {
247        self.id() == other.id()
248    }
249}
250
251impl Eq for VmoId {}
252
253impl Drop for VmoId {
254    fn drop(&mut self) {
255        assert_eq!(self.0.load(Ordering::Relaxed), VMOID_INVALID, "Did you forget to detach?");
256    }
257}
258
259impl Hash for VmoId {
260    fn hash<H: Hasher>(&self, state: &mut H) {
261        self.id().hash(state);
262    }
263}
264
265/// Represents a client connection to a block device. This is a simplified version of the block.fidl
266/// interface.
267/// Most users will use the RemoteBlockClient instantiation of this trait.
268pub trait BlockClient: Send + Sync {
269    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
270    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
271
272    /// Detaches the given vmo-id from the device.
273    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
274
275    /// Reads from the device at |device_offset| into the given buffer slice.
276    fn read_at(
277        &self,
278        buffer_slice: MutableBufferSlice<'_>,
279        device_offset: u64,
280    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
281        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
282    }
283
284    fn read_at_with_opts(
285        &self,
286        buffer_slice: MutableBufferSlice<'_>,
287        device_offset: u64,
288        opts: ReadOptions,
289    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
290        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
291    }
292
293    fn read_at_with_opts_traced(
294        &self,
295        buffer_slice: MutableBufferSlice<'_>,
296        device_offset: u64,
297        opts: ReadOptions,
298        trace_flow_id: u64,
299    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
300
301    /// Writes the data in |buffer_slice| to the device.
302    fn write_at(
303        &self,
304        buffer_slice: BufferSlice<'_>,
305        device_offset: u64,
306    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
307        self.write_at_with_opts_traced(
308            buffer_slice,
309            device_offset,
310            WriteOptions::default(),
311            NO_TRACE_ID,
312        )
313    }
314
315    fn write_at_with_opts(
316        &self,
317        buffer_slice: BufferSlice<'_>,
318        device_offset: u64,
319        opts: WriteOptions,
320    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
321        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
322    }
323
324    fn write_at_with_opts_traced(
325        &self,
326        buffer_slice: BufferSlice<'_>,
327        device_offset: u64,
328        opts: WriteOptions,
329        trace_flow_id: u64,
330    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
331
332    /// Trims the given range on the block device.
333    fn trim(
334        &self,
335        device_range: Range<u64>,
336    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
337        self.trim_traced(device_range, NO_TRACE_ID)
338    }
339
340    fn trim_traced(
341        &self,
342        device_range: Range<u64>,
343        trace_flow_id: u64,
344    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
345
346    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
347    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
348    /// This method makes it easier to guarantee that the barrier is attached to the correct
349    /// write operation when subsequent write operations can get reordered.
350    fn barrier(&self);
351
352    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
353        self.flush_traced(NO_TRACE_ID)
354    }
355
356    /// Sends a flush request to the underlying block device.
357    fn flush_traced(
358        &self,
359        trace_flow_id: u64,
360    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
361
362    /// Closes the fifo.
363    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
364
365    /// Returns the block size of the device.
366    fn block_size(&self) -> u32;
367
368    /// Returns the size, in blocks, of the device.
369    fn block_count(&self) -> u64;
370
371    /// Returns the maximum number of blocks which can be transferred in a single request.
372    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
373
374    /// Returns the block flags reported by the device.
375    fn block_flags(&self) -> BlockDeviceFlag;
376
377    /// Returns true if the remote fifo is still connected.
378    fn is_connected(&self) -> bool;
379}
380
381struct Common {
382    block_size: u32,
383    block_count: u64,
384    max_transfer_blocks: Option<NonZero<u32>>,
385    block_flags: BlockDeviceFlag,
386    fifo_state: FifoStateRef,
387    temp_vmo: futures::lock::Mutex<zx::Vmo>,
388    temp_vmo_id: VmoId,
389}
390
391impl Common {
392    fn new(
393        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
394        info: &block::BlockInfo,
395        temp_vmo: zx::Vmo,
396        temp_vmo_id: VmoId,
397    ) -> Self {
398        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
399        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
400        Self {
401            block_size: info.block_size,
402            block_count: info.block_count,
403            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
404                NonZero::new(info.max_transfer_size / info.block_size)
405            } else {
406                None
407            },
408            block_flags: info.flags,
409            fifo_state,
410            temp_vmo: futures::lock::Mutex::new(temp_vmo),
411            temp_vmo_id,
412        }
413    }
414
415    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
416        if bytes % self.block_size as u64 != 0 {
417            Err(zx::Status::INVALID_ARGS)
418        } else {
419            Ok(bytes / self.block_size as u64)
420        }
421    }
422
423    // Sends the request and waits for the response.
424    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
425        let (request_id, trace_flow_id) = {
426            let mut state = self.fifo_state.lock();
427
428            let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
429            if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
430                && state.attach_barrier
431            {
432                flags |= BlockIoFlag::PRE_BARRIER;
433                request.command.flags = flags.bits();
434                state.attach_barrier = false;
435            }
436
437            if state.fifo.is_none() {
438                // Fifo has been closed.
439                return Err(zx::Status::CANCELED);
440            }
441            trace::duration!(
442                "storage",
443                "block_client::send::start",
444                "op" => opcode_str(request.command.opcode),
445                "len" => request.length * self.block_size
446            );
447            let request_id = state.next_request_id;
448            state.next_request_id = state.next_request_id.overflowing_add(1).0;
449            assert!(
450                state.map.insert(request_id, RequestState::default()).is_none(),
451                "request id in use!"
452            );
453            update_outstanding_requests_counter(state.map.len());
454            request.reqid = request_id;
455            if request.trace_flow_id == NO_TRACE_ID {
456                request.trace_flow_id = generate_trace_flow_id(request_id);
457            }
458            let trace_flow_id = request.trace_flow_id;
459            trace::flow_begin!("storage", "block_client::send", trace_flow_id.into());
460            state.queue.push_back(request);
461            if let Some(waker) = state.poller_waker.clone() {
462                state.poll_send_requests(&mut Context::from_waker(&waker));
463            }
464            (request_id, trace_flow_id)
465        };
466        ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
467        trace::duration!("storage", "block_client::send::end");
468        trace::flow_end!("storage", "block_client::send", trace_flow_id.into());
469        Ok(())
470    }
471
472    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
473        self.send(BlockFifoRequest {
474            command: BlockFifoCommand {
475                opcode: BlockOpcode::CloseVmo.into_primitive(),
476                flags: 0,
477                ..Default::default()
478            },
479            vmoid: vmo_id.into_id(),
480            ..Default::default()
481        })
482        .await
483    }
484
485    async fn read_at(
486        &self,
487        buffer_slice: MutableBufferSlice<'_>,
488        device_offset: u64,
489        opts: ReadOptions,
490        trace_flow_id: u64,
491    ) -> Result<(), zx::Status> {
492        let mut flags = BlockIoFlag::empty();
493
494        if opts.inline_crypto.is_enabled {
495            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
496        }
497
498        match buffer_slice {
499            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
500                self.send(BlockFifoRequest {
501                    command: BlockFifoCommand {
502                        opcode: BlockOpcode::Read.into_primitive(),
503                        flags: flags.bits(),
504                        ..Default::default()
505                    },
506                    vmoid: vmo_id.id(),
507                    length: self
508                        .to_blocks(length)?
509                        .try_into()
510                        .map_err(|_| zx::Status::INVALID_ARGS)?,
511                    vmo_offset: self.to_blocks(offset)?,
512                    dev_offset: self.to_blocks(device_offset)?,
513                    trace_flow_id,
514                    dun: opts.inline_crypto.dun,
515                    slot: opts.inline_crypto.slot,
516                    ..Default::default()
517                })
518                .await?
519            }
520            MutableBufferSlice::Memory(mut slice) => {
521                let temp_vmo = self.temp_vmo.lock().await;
522                let mut device_block = self.to_blocks(device_offset)?;
523                loop {
524                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
525                    let block_count = self.to_blocks(to_do as u64)? as u32;
526                    self.send(BlockFifoRequest {
527                        command: BlockFifoCommand {
528                            opcode: BlockOpcode::Read.into_primitive(),
529                            flags: flags.bits(),
530                            ..Default::default()
531                        },
532                        vmoid: self.temp_vmo_id.id(),
533                        length: block_count,
534                        vmo_offset: 0,
535                        dev_offset: device_block,
536                        trace_flow_id,
537                        dun: opts.inline_crypto.dun,
538                        slot: opts.inline_crypto.slot,
539                        ..Default::default()
540                    })
541                    .await?;
542                    temp_vmo.read(&mut slice[..to_do], 0)?;
543                    if to_do == slice.len() {
544                        break;
545                    }
546                    device_block += block_count as u64;
547                    slice = &mut slice[to_do..];
548                }
549            }
550        }
551        Ok(())
552    }
553
554    async fn write_at(
555        &self,
556        buffer_slice: BufferSlice<'_>,
557        device_offset: u64,
558        opts: WriteOptions,
559        trace_flow_id: u64,
560    ) -> Result<(), zx::Status> {
561        let mut flags = BlockIoFlag::empty();
562
563        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
564            flags |= BlockIoFlag::FORCE_ACCESS;
565        }
566
567        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
568            flags |= BlockIoFlag::PRE_BARRIER;
569        }
570
571        if opts.inline_crypto.is_enabled {
572            flags |= BlockIoFlag::INLINE_ENCRYPTION_ENABLED;
573        }
574
575        match buffer_slice {
576            BufferSlice::VmoId { vmo_id, offset, length } => {
577                self.send(BlockFifoRequest {
578                    command: BlockFifoCommand {
579                        opcode: BlockOpcode::Write.into_primitive(),
580                        flags: flags.bits(),
581                        ..Default::default()
582                    },
583                    vmoid: vmo_id.id(),
584                    length: self
585                        .to_blocks(length)?
586                        .try_into()
587                        .map_err(|_| zx::Status::INVALID_ARGS)?,
588                    vmo_offset: self.to_blocks(offset)?,
589                    dev_offset: self.to_blocks(device_offset)?,
590                    trace_flow_id,
591                    dun: opts.inline_crypto.dun,
592                    slot: opts.inline_crypto.slot,
593                    ..Default::default()
594                })
595                .await?;
596            }
597            BufferSlice::Memory(mut slice) => {
598                let temp_vmo = self.temp_vmo.lock().await;
599                let mut device_block = self.to_blocks(device_offset)?;
600                loop {
601                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
602                    let block_count = self.to_blocks(to_do as u64)? as u32;
603                    temp_vmo.write(&slice[..to_do], 0)?;
604                    self.send(BlockFifoRequest {
605                        command: BlockFifoCommand {
606                            opcode: BlockOpcode::Write.into_primitive(),
607                            flags: flags.bits(),
608                            ..Default::default()
609                        },
610                        vmoid: self.temp_vmo_id.id(),
611                        length: block_count,
612                        vmo_offset: 0,
613                        dev_offset: device_block,
614                        trace_flow_id,
615                        dun: opts.inline_crypto.dun,
616                        slot: opts.inline_crypto.slot,
617                        ..Default::default()
618                    })
619                    .await?;
620                    if to_do == slice.len() {
621                        break;
622                    }
623                    device_block += block_count as u64;
624                    slice = &slice[to_do..];
625                }
626            }
627        }
628        Ok(())
629    }
630
631    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
632        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
633        let dev_offset = self.to_blocks(device_range.start)?;
634        self.send(BlockFifoRequest {
635            command: BlockFifoCommand {
636                opcode: BlockOpcode::Trim.into_primitive(),
637                flags: 0,
638                ..Default::default()
639            },
640            vmoid: VMOID_INVALID,
641            length,
642            dev_offset,
643            trace_flow_id,
644            ..Default::default()
645        })
646        .await
647    }
648
649    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
650        self.send(BlockFifoRequest {
651            command: BlockFifoCommand {
652                opcode: BlockOpcode::Flush.into_primitive(),
653                flags: 0,
654                ..Default::default()
655            },
656            vmoid: VMOID_INVALID,
657            trace_flow_id,
658            ..Default::default()
659        })
660        .await
661    }
662
663    fn barrier(&self) {
664        self.fifo_state.lock().attach_barrier = true;
665    }
666
667    fn block_size(&self) -> u32 {
668        self.block_size
669    }
670
671    fn block_count(&self) -> u64 {
672        self.block_count
673    }
674
675    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
676        self.max_transfer_blocks.clone()
677    }
678
679    fn block_flags(&self) -> BlockDeviceFlag {
680        self.block_flags
681    }
682
683    fn is_connected(&self) -> bool {
684        self.fifo_state.lock().fifo.is_some()
685    }
686}
687
688impl Drop for Common {
689    fn drop(&mut self) {
690        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
691        // down.
692        let _ = self.temp_vmo_id.take().into_id();
693        self.fifo_state.lock().terminate();
694    }
695}
696
697// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
698pub struct RemoteBlockClient {
699    session: block::SessionProxy,
700    common: Common,
701}
702
703impl RemoteBlockClient {
704    /// Returns a connection to a remote block device via the given channel.
705    pub async fn new(remote: impl Borrow<block::BlockProxy>) -> Result<Self, zx::Status> {
706        let remote = remote.borrow();
707        let info =
708            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
709        let (session, server) = fidl::endpoints::create_proxy();
710        let () = remote.open_session(server).map_err(fidl_to_status)?;
711        Self::from_session(info, session).await
712    }
713
714    pub async fn from_session(
715        info: block::BlockInfo,
716        session: block::SessionProxy,
717    ) -> Result<Self, zx::Status> {
718        const SCRATCH_VMO_NAME: zx::Name = zx::Name::new_lossy("block-client-scratch-vmo");
719        let fifo =
720            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
721        let fifo = fasync::Fifo::from_fifo(fifo);
722        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
723        temp_vmo.set_name(&SCRATCH_VMO_NAME)?;
724        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
725        let vmo_id =
726            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
727        let vmo_id = VmoId::new(vmo_id.id);
728        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
729    }
730}
731
732impl BlockClient for RemoteBlockClient {
733    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
734        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
735        let vmo_id = self
736            .session
737            .attach_vmo(dup)
738            .await
739            .map_err(fidl_to_status)?
740            .map_err(zx::Status::from_raw)?;
741        Ok(VmoId::new(vmo_id.id))
742    }
743
744    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
745        self.common.detach_vmo(vmo_id).await
746    }
747
748    async fn read_at_with_opts_traced(
749        &self,
750        buffer_slice: MutableBufferSlice<'_>,
751        device_offset: u64,
752        opts: ReadOptions,
753        trace_flow_id: u64,
754    ) -> Result<(), zx::Status> {
755        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
756    }
757
758    async fn write_at_with_opts_traced(
759        &self,
760        buffer_slice: BufferSlice<'_>,
761        device_offset: u64,
762        opts: WriteOptions,
763        trace_flow_id: u64,
764    ) -> Result<(), zx::Status> {
765        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
766    }
767
768    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
769        self.common.trim(range, trace_flow_id).await
770    }
771
772    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
773        self.common.flush(trace_flow_id).await
774    }
775
776    fn barrier(&self) {
777        self.common.barrier()
778    }
779
780    async fn close(&self) -> Result<(), zx::Status> {
781        let () =
782            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
783        Ok(())
784    }
785
786    fn block_size(&self) -> u32 {
787        self.common.block_size()
788    }
789
790    fn block_count(&self) -> u64 {
791        self.common.block_count()
792    }
793
794    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
795        self.common.max_transfer_blocks()
796    }
797
798    fn block_flags(&self) -> BlockDeviceFlag {
799        self.common.block_flags()
800    }
801
802    fn is_connected(&self) -> bool {
803        self.common.is_connected()
804    }
805}
806
807pub struct RemoteBlockClientSync {
808    session: block::SessionSynchronousProxy,
809    common: Common,
810}
811
812impl RemoteBlockClientSync {
813    /// Returns a connection to a remote block device via the given channel, but spawns a separate
814    /// thread for polling the fifo which makes it work in cases where no executor is configured for
815    /// the calling thread.
816    pub fn new(
817        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
818    ) -> Result<Self, zx::Status> {
819        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
820        let info = remote
821            .get_info(zx::MonotonicInstant::INFINITE)
822            .map_err(fidl_to_status)?
823            .map_err(zx::Status::from_raw)?;
824        let (client, server) = fidl::endpoints::create_endpoints();
825        let () = remote.open_session(server).map_err(fidl_to_status)?;
826        let session = block::SessionSynchronousProxy::new(client.into_channel());
827        let fifo = session
828            .get_fifo(zx::MonotonicInstant::INFINITE)
829            .map_err(fidl_to_status)?
830            .map_err(zx::Status::from_raw)?;
831        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
832        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
833        let vmo_id = session
834            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
835            .map_err(fidl_to_status)?
836            .map_err(zx::Status::from_raw)?;
837        let vmo_id = VmoId::new(vmo_id.id);
838
839        // The fifo needs to be instantiated from the thread that has the executor as that's where
840        // the fifo registers for notifications to be delivered.
841        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
842        std::thread::spawn(move || {
843            let mut executor = fasync::LocalExecutor::default();
844            let fifo = fasync::Fifo::from_fifo(fifo);
845            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
846            let fifo_state = common.fifo_state.clone();
847            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
848            executor.run_singlethreaded(FifoPoller { fifo_state });
849        });
850        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
851    }
852
853    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
854        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
855        let vmo_id = self
856            .session
857            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
858            .map_err(fidl_to_status)?
859            .map_err(zx::Status::from_raw)?;
860        Ok(VmoId::new(vmo_id.id))
861    }
862
863    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
864        block_on(self.common.detach_vmo(vmo_id))
865    }
866
867    pub fn read_at(
868        &self,
869        buffer_slice: MutableBufferSlice<'_>,
870        device_offset: u64,
871    ) -> Result<(), zx::Status> {
872        block_on(self.common.read_at(
873            buffer_slice,
874            device_offset,
875            ReadOptions::default(),
876            NO_TRACE_ID,
877        ))
878    }
879
880    pub fn write_at(
881        &self,
882        buffer_slice: BufferSlice<'_>,
883        device_offset: u64,
884    ) -> Result<(), zx::Status> {
885        block_on(self.common.write_at(
886            buffer_slice,
887            device_offset,
888            WriteOptions::default(),
889            NO_TRACE_ID,
890        ))
891    }
892
893    pub fn flush(&self) -> Result<(), zx::Status> {
894        block_on(self.common.flush(NO_TRACE_ID))
895    }
896
897    pub fn close(&self) -> Result<(), zx::Status> {
898        let () = self
899            .session
900            .close(zx::MonotonicInstant::INFINITE)
901            .map_err(fidl_to_status)?
902            .map_err(zx::Status::from_raw)?;
903        Ok(())
904    }
905
906    pub fn block_size(&self) -> u32 {
907        self.common.block_size()
908    }
909
910    pub fn block_count(&self) -> u64 {
911        self.common.block_count()
912    }
913
914    pub fn is_connected(&self) -> bool {
915        self.common.is_connected()
916    }
917}
918
919impl Drop for RemoteBlockClientSync {
920    fn drop(&mut self) {
921        // Ignore errors here as there is not much we can do about it.
922        let _ = self.close();
923    }
924}
925
926// FifoPoller is a future responsible for sending and receiving from the fifo.
927struct FifoPoller {
928    fifo_state: FifoStateRef,
929}
930
931impl Future for FifoPoller {
932    type Output = ();
933
934    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
935        let mut state_lock = self.fifo_state.lock();
936        let state = state_lock.deref_mut(); // So that we can split the borrow.
937
938        // Send requests.
939        if state.poll_send_requests(context) {
940            return Poll::Ready(());
941        }
942
943        // Receive responses.
944        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
945        loop {
946            let mut response = MaybeUninit::uninit();
947            match fifo.try_read(context, &mut response) {
948                Poll::Pending => {
949                    state.poller_waker = Some(context.waker().clone());
950                    return Poll::Pending;
951                }
952                Poll::Ready(Ok(_)) => {
953                    let response = unsafe { response.assume_init() };
954                    let request_id = response.reqid;
955                    // If the request isn't in the map, assume that it's a cancelled read.
956                    if let Some(request_state) = state.map.get_mut(&request_id) {
957                        request_state.result.replace(zx::Status::from_raw(response.status));
958                        if let Some(waker) = request_state.waker.take() {
959                            waker.wake();
960                        }
961                    }
962                }
963                Poll::Ready(Err(_)) => {
964                    state.terminate();
965                    return Poll::Ready(());
966                }
967            }
968        }
969    }
970}
971
972fn update_outstanding_requests_counter(outstanding: usize) {
973    trace::counter!("storage", "block-requests", 0, "outstanding" => outstanding);
974}
975
976#[cfg(test)]
977mod tests {
978    use super::{
979        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
980        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
981    };
982    use block_protocol::ReadOptions;
983    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
984    use fidl::endpoints::RequestStream as _;
985    use fidl_fuchsia_storage_block as block;
986    use fuchsia_async as fasync;
987    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
988    use futures::join;
989    use futures::stream::StreamExt as _;
990    use futures::stream::futures_unordered::FuturesUnordered;
991    use ramdevice_client::RamdiskClient;
992    use std::borrow::Cow;
993    use std::num::NonZero;
994    use std::sync::Arc;
995    use std::sync::atomic::{AtomicBool, Ordering};
996
997    const RAMDISK_BLOCK_SIZE: u64 = 1024;
998    const RAMDISK_BLOCK_COUNT: u64 = 1024;
999
1000    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1001        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1002            .await
1003            .expect("RamdiskClient::create failed");
1004        let client_end = ramdisk.open().expect("ramdisk.open failed");
1005        let proxy = client_end.into_proxy();
1006        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1007        assert_eq!(block_client.block_size(), 1024);
1008        let client_end = ramdisk.open().expect("ramdisk.open failed");
1009        let proxy = client_end.into_proxy();
1010        (ramdisk, proxy, block_client)
1011    }
1012
1013    #[fuchsia::test]
1014    async fn test_against_ram_disk() {
1015        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1016
1017        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1018        vmo.write(b"hello", 5).expect("vmo.write failed");
1019        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1020        block_client
1021            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1022            .await
1023            .expect("write_at failed");
1024        block_client
1025            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1026            .await
1027            .expect("read_at failed");
1028        let mut buf: [u8; 5] = Default::default();
1029        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1030        assert_eq!(&buf, b"hello");
1031        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1032    }
1033
1034    #[fuchsia::test]
1035    async fn test_alignment() {
1036        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1037        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1038        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1039        block_client
1040            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1041            .await
1042            .expect_err("expected failure due to bad alignment");
1043        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1044    }
1045
1046    #[fuchsia::test]
1047    async fn test_parallel_io() {
1048        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1049        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1050        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1051        let mut reads = Vec::new();
1052        for _ in 0..1024 {
1053            reads.push(
1054                block_client
1055                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1056                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1057            );
1058        }
1059        futures::future::join_all(reads).await;
1060        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1061    }
1062
1063    #[fuchsia::test]
1064    async fn test_closed_device() {
1065        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1066        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1067        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1068        let mut reads = Vec::new();
1069        for _ in 0..1024 {
1070            reads.push(
1071                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1072            );
1073        }
1074        assert!(block_client.is_connected());
1075        let _ = futures::join!(futures::future::join_all(reads), async {
1076            std::mem::drop(ramdisk);
1077        });
1078        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1079        while block_client
1080            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1081            .await
1082            .is_ok()
1083        {}
1084
1085        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1086        // get false-positives from is_connected.
1087        while block_client.is_connected() {
1088            // Sleep for a bit to minimise lock contention.
1089            fasync::Timer::new(fasync::MonotonicInstant::after(
1090                zx::MonotonicDuration::from_millis(500),
1091            ))
1092            .await;
1093        }
1094
1095        // But once is_connected goes negative, it should stay negative.
1096        assert_eq!(block_client.is_connected(), false);
1097        let _ = block_client.detach_vmo(vmo_id).await;
1098    }
1099
1100    #[fuchsia::test]
1101    async fn test_cancelled_reads() {
1102        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1103        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1104        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1105        {
1106            let mut reads = FuturesUnordered::new();
1107            for _ in 0..1024 {
1108                reads.push(
1109                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1110                );
1111            }
1112            // Read the first 500 results and then dump the rest.
1113            for _ in 0..500 {
1114                reads.next().await;
1115            }
1116        }
1117        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1118    }
1119
1120    #[fuchsia::test]
1121    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1122        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1123        let block_client_ref = &block_client;
1124        let test_one = |offset, len, fill| async move {
1125            let buf = vec![fill; len];
1126            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1127            // Read back an extra block either side.
1128            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1129            block_client_ref
1130                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1131                .await
1132                .expect("read_at failed");
1133            assert_eq!(
1134                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1135                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1136            );
1137            assert_eq!(
1138                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1139                &buf[..]
1140            );
1141            assert_eq!(
1142                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1143                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1144            );
1145        };
1146        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1147        join!(
1148            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1149            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1150        );
1151    }
1152
1153    // Implements dummy server which can be used by test cases to verify whether
1154    // channel messages and fifo operations are being received - by using set_channel_handler or
1155    // set_fifo_hander respectively
1156    struct FakeBlockServer<'a> {
1157        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1158        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1159        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1160    }
1161
1162    impl<'a> FakeBlockServer<'a> {
1163        // Creates a new FakeBlockServer given a channel to listen on.
1164        //
1165        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1166        // handles requests received from channel or the fifo respectfully.
1167        //
1168        // 'channel_handler' receives a message before it is handled by the default implementation
1169        // and can return 'true' to indicate all processing is done and no further processing of
1170        // that message is required
1171        //
1172        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1173        // FakeBlockServer will send over the fifo.
1174        fn new(
1175            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1176            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1177            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1178        ) -> FakeBlockServer<'a> {
1179            FakeBlockServer {
1180                server_channel: Some(server_channel),
1181                channel_handler: Box::new(channel_handler),
1182                fifo_handler: Box::new(fifo_handler),
1183            }
1184        }
1185
1186        // Runs the server
1187        async fn run(&mut self) {
1188            let server = self.server_channel.take().unwrap();
1189
1190            // Set up a mock server.
1191            let (server_fifo, client_fifo) =
1192                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1193                    .expect("Fifo::create failed");
1194            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1195
1196            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1197            let fifo_future = Abortable::new(
1198                async {
1199                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1200                    let (mut reader, mut writer) = fifo.async_io();
1201                    let mut request = BlockFifoRequest::default();
1202                    loop {
1203                        match reader.read_entries(&mut request).await {
1204                            Ok(1) => {}
1205                            Err(zx::Status::PEER_CLOSED) => break,
1206                            Err(e) => panic!("read_entry failed {:?}", e),
1207                            _ => unreachable!(),
1208                        };
1209
1210                        let response = self.fifo_handler.as_ref()(request);
1211                        writer
1212                            .write_entries(std::slice::from_ref(&response))
1213                            .await
1214                            .expect("write_entries failed");
1215                    }
1216                },
1217                fifo_future_abort_registration,
1218            );
1219
1220            let channel_future = async {
1221                server
1222                    .into_stream()
1223                    .for_each_concurrent(None, |request| async {
1224                        let request = request.expect("unexpected fidl error");
1225
1226                        match request {
1227                            block::BlockRequest::GetInfo { responder } => {
1228                                responder
1229                                    .send(Ok(&block::BlockInfo {
1230                                        block_count: 1024,
1231                                        block_size: 512,
1232                                        max_transfer_size: 1024 * 1024,
1233                                        flags: block::DeviceFlag::empty(),
1234                                    }))
1235                                    .expect("send failed");
1236                            }
1237                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1238                                let stream = session.into_stream();
1239                                stream
1240                                    .for_each(|request| async {
1241                                        let request = request.expect("unexpected fidl error");
1242                                        // Give a chance for the test to register and potentially
1243                                        // handle the event
1244                                        if self.channel_handler.as_ref()(&request) {
1245                                            return;
1246                                        }
1247                                        match request {
1248                                            block::SessionRequest::GetFifo { responder } => {
1249                                                match maybe_server_fifo.lock().take() {
1250                                                    Some(fifo) => {
1251                                                        responder.send(Ok(fifo.downcast()))
1252                                                    }
1253                                                    None => responder.send(Err(
1254                                                        zx::Status::NO_RESOURCES.into_raw(),
1255                                                    )),
1256                                                }
1257                                                .expect("send failed")
1258                                            }
1259                                            block::SessionRequest::AttachVmo {
1260                                                vmo: _,
1261                                                responder,
1262                                            } => responder
1263                                                .send(Ok(&block::VmoId { id: 1 }))
1264                                                .expect("send failed"),
1265                                            block::SessionRequest::Close { responder } => {
1266                                                fifo_future_abort.abort();
1267                                                responder.send(Ok(())).expect("send failed")
1268                                            }
1269                                        }
1270                                    })
1271                                    .await
1272                            }
1273                            _ => panic!("Unexpected message"),
1274                        }
1275                    })
1276                    .await;
1277            };
1278
1279            let _result = join!(fifo_future, channel_future);
1280            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1281        }
1282    }
1283
1284    #[fuchsia::test]
1285    async fn test_block_close_is_called() {
1286        let close_called = fuchsia_sync::Mutex::new(false);
1287        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1288
1289        std::thread::spawn(move || {
1290            let _block_client =
1291                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1292            // The drop here should cause Close to be sent.
1293        });
1294
1295        let channel_handler = |request: &block::SessionRequest| -> bool {
1296            if let block::SessionRequest::Close { .. } = request {
1297                *close_called.lock() = true;
1298            }
1299            false
1300        };
1301        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1302
1303        // After the server has finished running, we can check to see that close was called.
1304        assert!(*close_called.lock());
1305    }
1306
1307    #[fuchsia::test]
1308    async fn test_block_flush_is_called() {
1309        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1310
1311        struct Interface {
1312            flush_called: Arc<AtomicBool>,
1313        }
1314        impl block_server::async_interface::Interface for Interface {
1315            fn get_info(&self) -> Cow<'_, DeviceInfo> {
1316                Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1317                    device_flags: fidl_fuchsia_storage_block::DeviceFlag::empty(),
1318                    max_transfer_blocks: None,
1319                    block_range: Some(0..1000),
1320                    type_guid: [0; 16],
1321                    instance_guid: [0; 16],
1322                    name: "foo".to_string(),
1323                    flags: 0,
1324                }))
1325            }
1326
1327            async fn read(
1328                &self,
1329                _device_block_offset: u64,
1330                _block_count: u32,
1331                _vmo: &Arc<zx::Vmo>,
1332                _vmo_offset: u64,
1333                _opts: ReadOptions,
1334                _trace_flow_id: Option<NonZero<u64>>,
1335            ) -> Result<(), zx::Status> {
1336                unreachable!();
1337            }
1338
1339            async fn write(
1340                &self,
1341                _device_block_offset: u64,
1342                _block_count: u32,
1343                _vmo: &Arc<zx::Vmo>,
1344                _vmo_offset: u64,
1345                _opts: WriteOptions,
1346                _trace_flow_id: Option<NonZero<u64>>,
1347            ) -> Result<(), zx::Status> {
1348                unreachable!();
1349            }
1350
1351            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1352                self.flush_called.store(true, Ordering::Relaxed);
1353                Ok(())
1354            }
1355
1356            async fn trim(
1357                &self,
1358                _device_block_offset: u64,
1359                _block_count: u32,
1360                _trace_flow_id: Option<NonZero<u64>>,
1361            ) -> Result<(), zx::Status> {
1362                unreachable!();
1363            }
1364        }
1365
1366        let flush_called = Arc::new(AtomicBool::new(false));
1367
1368        futures::join!(
1369            async {
1370                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1371
1372                block_client.flush().await.expect("flush failed");
1373            },
1374            async {
1375                let block_server = BlockServer::new(
1376                    512,
1377                    Arc::new(Interface { flush_called: flush_called.clone() }),
1378                );
1379                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1380            }
1381        );
1382
1383        assert!(flush_called.load(Ordering::Relaxed));
1384    }
1385
1386    #[fuchsia::test]
1387    async fn test_trace_flow_ids_set() {
1388        let (proxy, server) = fidl::endpoints::create_proxy();
1389
1390        futures::join!(
1391            async {
1392                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1393                block_client.flush().await.expect("flush failed");
1394            },
1395            async {
1396                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1397                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1398                    if request.trace_flow_id > 0 {
1399                        *flow_id.lock() = Some(request.trace_flow_id);
1400                    }
1401                    BlockFifoResponse {
1402                        status: zx::Status::OK.into_raw(),
1403                        reqid: request.reqid,
1404                        ..Default::default()
1405                    }
1406                };
1407                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1408                // After the server has finished running, verify the trace flow ID was set to some value.
1409                assert!(flow_id.lock().is_some());
1410            }
1411        );
1412    }
1413}