block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
14use fidl_fuchsia_hardware_block_partition::PartitionProxy;
15use fidl_fuchsia_hardware_block_volume::VolumeProxy;
16use fuchsia_sync::Mutex;
17use futures::channel::oneshot;
18use futures::executor::block_on;
19use lazy_static::lazy_static;
20use std::collections::HashMap;
21use std::future::Future;
22use std::hash::{Hash, Hasher};
23use std::mem::MaybeUninit;
24use std::num::NonZero;
25use std::ops::{DerefMut, Range};
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicU16, Ordering};
29use std::task::{Context, Poll, Waker};
30use zx::sys::zx_handle_t;
31use zx::{self as zx, HandleBased as _};
32use {
33    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
34    fuchsia_async as fasync, storage_trace as trace,
35};
36
37pub use cache::Cache;
38
39pub use block::Flag as BlockFlags;
40
41pub use block_protocol::*;
42
43pub mod cache;
44
45const TEMP_VMO_SIZE: usize = 65536;
46
47/// If a trace flow ID isn't specified for requests, one will be generated.
48pub const NO_TRACE_ID: u64 = 0;
49
50pub use block_driver::{BlockIoFlag, BlockOpcode};
51
52fn fidl_to_status(error: fidl::Error) -> zx::Status {
53    match error {
54        fidl::Error::ClientChannelClosed { status, .. } => status,
55        _ => zx::Status::INTERNAL,
56    }
57}
58
59fn opcode_str(opcode: u8) -> &'static str {
60    match BlockOpcode::from_primitive(opcode) {
61        Some(BlockOpcode::Read) => "read",
62        Some(BlockOpcode::Write) => "write",
63        Some(BlockOpcode::Flush) => "flush",
64        Some(BlockOpcode::Trim) => "trim",
65        Some(BlockOpcode::CloseVmo) => "close_vmo",
66        None => "unknown",
67    }
68}
69
70// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
71// reused within this process).
72fn generate_trace_flow_id(request_id: u32) -> u64 {
73    lazy_static! {
74        static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
75    };
76    *SELF_HANDLE as u64 + (request_id as u64) << 32
77}
78
79pub enum BufferSlice<'a> {
80    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
81    Memory(&'a [u8]),
82}
83
84impl<'a> BufferSlice<'a> {
85    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
86        BufferSlice::VmoId { vmo_id, offset, length }
87    }
88}
89
90impl<'a> From<&'a [u8]> for BufferSlice<'a> {
91    fn from(buf: &'a [u8]) -> Self {
92        BufferSlice::Memory(buf)
93    }
94}
95
96pub enum MutableBufferSlice<'a> {
97    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
98    Memory(&'a mut [u8]),
99}
100
101impl<'a> MutableBufferSlice<'a> {
102    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
103        MutableBufferSlice::VmoId { vmo_id, offset, length }
104    }
105}
106
107impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
108    fn from(buf: &'a mut [u8]) -> Self {
109        MutableBufferSlice::Memory(buf)
110    }
111}
112
113#[derive(Default)]
114struct RequestState {
115    result: Option<zx::Status>,
116    waker: Option<Waker>,
117}
118
119#[derive(Default)]
120struct FifoState {
121    // The fifo.
122    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
123
124    // The next request ID to be used.
125    next_request_id: u32,
126
127    // A queue of messages to be sent on the fifo.
128    queue: std::collections::VecDeque<BlockFifoRequest>,
129
130    // Map from request ID to RequestState.
131    map: HashMap<u32, RequestState>,
132
133    // The waker for the FifoPoller.
134    poller_waker: Option<Waker>,
135
136    // If set, attach a barrier to the next write request
137    attach_barrier: bool,
138}
139
140impl FifoState {
141    fn terminate(&mut self) {
142        self.fifo.take();
143        for (_, request_state) in self.map.iter_mut() {
144            request_state.result.get_or_insert(zx::Status::CANCELED);
145            if let Some(waker) = request_state.waker.take() {
146                waker.wake();
147            }
148        }
149        if let Some(waker) = self.poller_waker.take() {
150            waker.wake();
151        }
152    }
153
154    // Returns true if polling should be terminated.
155    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
156        let fifo = if let Some(fifo) = self.fifo.as_ref() {
157            fifo
158        } else {
159            return true;
160        };
161
162        loop {
163            let slice = self.queue.as_slices().0;
164            if slice.is_empty() {
165                return false;
166            }
167            match fifo.try_write(context, slice) {
168                Poll::Ready(Ok(sent)) => {
169                    self.queue.drain(0..sent);
170                }
171                Poll::Ready(Err(_)) => {
172                    self.terminate();
173                    return true;
174                }
175                Poll::Pending => {
176                    return false;
177                }
178            }
179        }
180    }
181}
182
183type FifoStateRef = Arc<Mutex<FifoState>>;
184
185// A future used for fifo responses.
186struct ResponseFuture {
187    request_id: u32,
188    fifo_state: FifoStateRef,
189}
190
191impl ResponseFuture {
192    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
193        ResponseFuture { request_id, fifo_state }
194    }
195}
196
197impl Future for ResponseFuture {
198    type Output = Result<(), zx::Status>;
199
200    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
201        let mut state = self.fifo_state.lock();
202        let request_state = state.map.get_mut(&self.request_id).unwrap();
203        if let Some(result) = request_state.result {
204            Poll::Ready(result.into())
205        } else {
206            request_state.waker.replace(context.waker().clone());
207            Poll::Pending
208        }
209    }
210}
211
212impl Drop for ResponseFuture {
213    fn drop(&mut self) {
214        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
215    }
216}
217
218/// Wraps a vmo-id. Will panic if you forget to detach.
219#[derive(Debug)]
220#[must_use]
221pub struct VmoId(AtomicU16);
222
223impl VmoId {
224    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
225    pub fn new(id: u16) -> Self {
226        Self(AtomicU16::new(id))
227    }
228
229    /// Invalidates self and returns a new VmoId with the same underlying ID.
230    pub fn take(&self) -> Self {
231        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
232    }
233
234    pub fn is_valid(&self) -> bool {
235        self.id() != block_driver::BLOCK_VMOID_INVALID
236    }
237
238    /// Takes the ID.  The caller assumes responsibility for detaching.
239    #[must_use]
240    pub fn into_id(self) -> u16 {
241        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
242    }
243
244    pub fn id(&self) -> u16 {
245        self.0.load(Ordering::Relaxed)
246    }
247}
248
249impl PartialEq for VmoId {
250    fn eq(&self, other: &Self) -> bool {
251        self.id() == other.id()
252    }
253}
254
255impl Eq for VmoId {}
256
257impl Drop for VmoId {
258    fn drop(&mut self) {
259        assert_eq!(
260            self.0.load(Ordering::Relaxed),
261            block_driver::BLOCK_VMOID_INVALID,
262            "Did you forget to detach?"
263        );
264    }
265}
266
267impl Hash for VmoId {
268    fn hash<H: Hasher>(&self, state: &mut H) {
269        self.id().hash(state);
270    }
271}
272
273/// Represents a client connection to a block device. This is a simplified version of the block.fidl
274/// interface.
275/// Most users will use the RemoteBlockClient instantiation of this trait.
276pub trait BlockClient: Send + Sync {
277    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
278    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
279
280    /// Detaches the given vmo-id from the device.
281    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
282
283    /// Reads from the device at |device_offset| into the given buffer slice.
284    fn read_at(
285        &self,
286        buffer_slice: MutableBufferSlice<'_>,
287        device_offset: u64,
288    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
289        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
290    }
291
292    fn read_at_with_opts(
293        &self,
294        buffer_slice: MutableBufferSlice<'_>,
295        device_offset: u64,
296        opts: ReadOptions,
297    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
298        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
299    }
300
301    fn read_at_with_opts_traced(
302        &self,
303        buffer_slice: MutableBufferSlice<'_>,
304        device_offset: u64,
305        opts: ReadOptions,
306        trace_flow_id: u64,
307    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
308
309    /// Writes the data in |buffer_slice| to the device.
310    fn write_at(
311        &self,
312        buffer_slice: BufferSlice<'_>,
313        device_offset: u64,
314    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
315        self.write_at_with_opts_traced(
316            buffer_slice,
317            device_offset,
318            WriteOptions::default(),
319            NO_TRACE_ID,
320        )
321    }
322
323    fn write_at_with_opts(
324        &self,
325        buffer_slice: BufferSlice<'_>,
326        device_offset: u64,
327        opts: WriteOptions,
328    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
329        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
330    }
331
332    fn write_at_with_opts_traced(
333        &self,
334        buffer_slice: BufferSlice<'_>,
335        device_offset: u64,
336        opts: WriteOptions,
337        trace_flow_id: u64,
338    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
339
340    /// Trims the given range on the block device.
341    fn trim(
342        &self,
343        device_range: Range<u64>,
344    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
345        self.trim_traced(device_range, NO_TRACE_ID)
346    }
347
348    fn trim_traced(
349        &self,
350        device_range: Range<u64>,
351        trace_flow_id: u64,
352    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
353
354    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
355    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
356    /// This method makes it easier to guarantee that the barrier is attached to the correct
357    /// write operation when subsequent write operations can get reordered.
358    fn barrier(&self);
359
360    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
361        self.flush_traced(NO_TRACE_ID)
362    }
363
364    /// Sends a flush request to the underlying block device.
365    fn flush_traced(
366        &self,
367        trace_flow_id: u64,
368    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
369
370    /// Closes the fifo.
371    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
372
373    /// Returns the block size of the device.
374    fn block_size(&self) -> u32;
375
376    /// Returns the size, in blocks, of the device.
377    fn block_count(&self) -> u64;
378
379    /// Returns the maximum number of blocks which can be transferred in a single request.
380    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
381
382    /// Returns the block flags reported by the device.
383    fn block_flags(&self) -> BlockFlags;
384
385    /// Returns true if the remote fifo is still connected.
386    fn is_connected(&self) -> bool;
387}
388
389struct Common {
390    block_size: u32,
391    block_count: u64,
392    max_transfer_blocks: Option<NonZero<u32>>,
393    block_flags: BlockFlags,
394    fifo_state: FifoStateRef,
395    temp_vmo: futures::lock::Mutex<zx::Vmo>,
396    temp_vmo_id: VmoId,
397}
398
399impl Common {
400    fn new(
401        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
402        info: &block::BlockInfo,
403        temp_vmo: zx::Vmo,
404        temp_vmo_id: VmoId,
405    ) -> Self {
406        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
407        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
408        Self {
409            block_size: info.block_size,
410            block_count: info.block_count,
411            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
412                NonZero::new(info.max_transfer_size / info.block_size)
413            } else {
414                None
415            },
416            block_flags: info.flags,
417            fifo_state,
418            temp_vmo: futures::lock::Mutex::new(temp_vmo),
419            temp_vmo_id,
420        }
421    }
422
423    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
424        if bytes % self.block_size as u64 != 0 {
425            Err(zx::Status::INVALID_ARGS)
426        } else {
427            Ok(bytes / self.block_size as u64)
428        }
429    }
430
431    // Sends the request and waits for the response.
432    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
433        async move {
434            let (request_id, trace_flow_id) = {
435                let mut state = self.fifo_state.lock();
436
437                let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
438                if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
439                    && state.attach_barrier
440                {
441                    flags |= BlockIoFlag::PRE_BARRIER;
442                    request.command.flags = flags.bits();
443                    state.attach_barrier = false;
444                }
445
446                if state.fifo.is_none() {
447                    // Fifo has been closed.
448                    return Err(zx::Status::CANCELED);
449                }
450                trace::duration!(
451                    c"storage",
452                    c"block_client::send::start",
453                    "op" => opcode_str(request.command.opcode)
454                );
455                let request_id = state.next_request_id;
456                state.next_request_id = state.next_request_id.overflowing_add(1).0;
457                assert!(
458                    state.map.insert(request_id, RequestState::default()).is_none(),
459                    "request id in use!"
460                );
461                request.reqid = request_id;
462                if request.trace_flow_id == NO_TRACE_ID {
463                    request.trace_flow_id = generate_trace_flow_id(request_id);
464                }
465                let trace_flow_id = request.trace_flow_id;
466                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
467                state.queue.push_back(request);
468                if let Some(waker) = state.poller_waker.clone() {
469                    state.poll_send_requests(&mut Context::from_waker(&waker));
470                }
471                (request_id, trace_flow_id)
472            };
473            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
474            trace::duration!(c"storage", c"block_client::send::end");
475            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
476            Ok(())
477        }
478        .await
479    }
480
481    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
482        self.send(BlockFifoRequest {
483            command: BlockFifoCommand {
484                opcode: BlockOpcode::CloseVmo.into_primitive(),
485                flags: 0,
486                ..Default::default()
487            },
488            vmoid: vmo_id.into_id(),
489            ..Default::default()
490        })
491        .await
492    }
493
494    async fn read_at(
495        &self,
496        buffer_slice: MutableBufferSlice<'_>,
497        device_offset: u64,
498        opts: ReadOptions,
499        trace_flow_id: u64,
500    ) -> Result<(), zx::Status> {
501        match buffer_slice {
502            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
503                self.send(BlockFifoRequest {
504                    command: BlockFifoCommand {
505                        opcode: BlockOpcode::Read.into_primitive(),
506                        flags: 0,
507                        ..Default::default()
508                    },
509                    vmoid: vmo_id.id(),
510                    length: self
511                        .to_blocks(length)?
512                        .try_into()
513                        .map_err(|_| zx::Status::INVALID_ARGS)?,
514                    vmo_offset: self.to_blocks(offset)?,
515                    dev_offset: self.to_blocks(device_offset)?,
516                    trace_flow_id,
517                    dun: opts.inline_crypto_options.dun,
518                    slot: opts.inline_crypto_options.slot,
519                    ..Default::default()
520                })
521                .await?
522            }
523            MutableBufferSlice::Memory(mut slice) => {
524                let temp_vmo = self.temp_vmo.lock().await;
525                let mut device_block = self.to_blocks(device_offset)?;
526                loop {
527                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
528                    let block_count = self.to_blocks(to_do as u64)? as u32;
529                    self.send(BlockFifoRequest {
530                        command: BlockFifoCommand {
531                            opcode: BlockOpcode::Read.into_primitive(),
532                            flags: 0,
533                            ..Default::default()
534                        },
535                        vmoid: self.temp_vmo_id.id(),
536                        length: block_count,
537                        vmo_offset: 0,
538                        dev_offset: device_block,
539                        trace_flow_id,
540                        dun: opts.inline_crypto_options.dun,
541                        slot: opts.inline_crypto_options.slot,
542                        ..Default::default()
543                    })
544                    .await?;
545                    temp_vmo.read(&mut slice[..to_do], 0)?;
546                    if to_do == slice.len() {
547                        break;
548                    }
549                    device_block += block_count as u64;
550                    slice = &mut slice[to_do..];
551                }
552            }
553        }
554        Ok(())
555    }
556
557    async fn write_at(
558        &self,
559        buffer_slice: BufferSlice<'_>,
560        device_offset: u64,
561        opts: WriteOptions,
562        trace_flow_id: u64,
563    ) -> Result<(), zx::Status> {
564        let mut flags = BlockIoFlag::empty();
565
566        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
567            flags |= BlockIoFlag::FORCE_ACCESS;
568        }
569
570        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
571            flags |= BlockIoFlag::PRE_BARRIER;
572        }
573
574        match buffer_slice {
575            BufferSlice::VmoId { vmo_id, offset, length } => {
576                self.send(BlockFifoRequest {
577                    command: BlockFifoCommand {
578                        opcode: BlockOpcode::Write.into_primitive(),
579                        flags: flags.bits(),
580                        ..Default::default()
581                    },
582                    vmoid: vmo_id.id(),
583                    length: self
584                        .to_blocks(length)?
585                        .try_into()
586                        .map_err(|_| zx::Status::INVALID_ARGS)?,
587                    vmo_offset: self.to_blocks(offset)?,
588                    dev_offset: self.to_blocks(device_offset)?,
589                    trace_flow_id,
590                    dun: opts.inline_crypto_options.dun,
591                    slot: opts.inline_crypto_options.slot,
592                    ..Default::default()
593                })
594                .await?;
595            }
596            BufferSlice::Memory(mut slice) => {
597                let temp_vmo = self.temp_vmo.lock().await;
598                let mut device_block = self.to_blocks(device_offset)?;
599                loop {
600                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
601                    let block_count = self.to_blocks(to_do as u64)? as u32;
602                    temp_vmo.write(&slice[..to_do], 0)?;
603                    self.send(BlockFifoRequest {
604                        command: BlockFifoCommand {
605                            opcode: BlockOpcode::Write.into_primitive(),
606                            flags: flags.bits(),
607                            ..Default::default()
608                        },
609                        vmoid: self.temp_vmo_id.id(),
610                        length: block_count,
611                        vmo_offset: 0,
612                        dev_offset: device_block,
613                        trace_flow_id,
614                        dun: opts.inline_crypto_options.dun,
615                        slot: opts.inline_crypto_options.slot,
616                        ..Default::default()
617                    })
618                    .await?;
619                    if to_do == slice.len() {
620                        break;
621                    }
622                    device_block += block_count as u64;
623                    slice = &slice[to_do..];
624                }
625            }
626        }
627        Ok(())
628    }
629
630    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
631        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
632        let dev_offset = self.to_blocks(device_range.start)?;
633        self.send(BlockFifoRequest {
634            command: BlockFifoCommand {
635                opcode: BlockOpcode::Trim.into_primitive(),
636                flags: 0,
637                ..Default::default()
638            },
639            vmoid: block_driver::BLOCK_VMOID_INVALID,
640            length,
641            dev_offset,
642            trace_flow_id,
643            ..Default::default()
644        })
645        .await
646    }
647
648    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
649        self.send(BlockFifoRequest {
650            command: BlockFifoCommand {
651                opcode: BlockOpcode::Flush.into_primitive(),
652                flags: 0,
653                ..Default::default()
654            },
655            vmoid: block_driver::BLOCK_VMOID_INVALID,
656            trace_flow_id,
657            ..Default::default()
658        })
659        .await
660    }
661
662    fn barrier(&self) {
663        self.fifo_state.lock().attach_barrier = true;
664    }
665
666    fn block_size(&self) -> u32 {
667        self.block_size
668    }
669
670    fn block_count(&self) -> u64 {
671        self.block_count
672    }
673
674    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
675        self.max_transfer_blocks.clone()
676    }
677
678    fn block_flags(&self) -> BlockFlags {
679        self.block_flags
680    }
681
682    fn is_connected(&self) -> bool {
683        self.fifo_state.lock().fifo.is_some()
684    }
685}
686
687impl Drop for Common {
688    fn drop(&mut self) {
689        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
690        // down.
691        let _ = self.temp_vmo_id.take().into_id();
692        self.fifo_state.lock().terminate();
693    }
694}
695
696/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
697pub struct RemoteBlockClient {
698    session: block::SessionProxy,
699    common: Common,
700}
701
702pub trait AsBlockProxy {
703    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
704
705    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
706}
707
708impl<T: AsBlockProxy> AsBlockProxy for &T {
709    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
710        AsBlockProxy::get_info(*self)
711    }
712    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
713        AsBlockProxy::open_session(*self, session)
714    }
715}
716
717macro_rules! impl_as_block_proxy {
718    ($name:ident) => {
719        impl AsBlockProxy for $name {
720            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
721                $name::get_info(self).await
722            }
723
724            fn open_session(
725                &self,
726                session: ServerEnd<block::SessionMarker>,
727            ) -> Result<(), fidl::Error> {
728                $name::open_session(self, session)
729            }
730        }
731    };
732}
733
734impl_as_block_proxy!(BlockProxy);
735impl_as_block_proxy!(PartitionProxy);
736impl_as_block_proxy!(VolumeProxy);
737
738impl RemoteBlockClient {
739    /// Returns a connection to a remote block device via the given channel.
740    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
741        let info =
742            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
743        let (session, server) = fidl::endpoints::create_proxy();
744        let () = remote.open_session(server).map_err(fidl_to_status)?;
745        Self::from_session(info, session).await
746    }
747
748    pub async fn from_session(
749        info: block::BlockInfo,
750        session: block::SessionProxy,
751    ) -> Result<Self, zx::Status> {
752        let fifo =
753            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
754        let fifo = fasync::Fifo::from_fifo(fifo);
755        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
756        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
757        let vmo_id =
758            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
759        let vmo_id = VmoId::new(vmo_id.id);
760        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
761    }
762}
763
764impl BlockClient for RemoteBlockClient {
765    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
766        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
767        let vmo_id = self
768            .session
769            .attach_vmo(dup)
770            .await
771            .map_err(fidl_to_status)?
772            .map_err(zx::Status::from_raw)?;
773        Ok(VmoId::new(vmo_id.id))
774    }
775
776    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
777        self.common.detach_vmo(vmo_id).await
778    }
779
780    async fn read_at_with_opts_traced(
781        &self,
782        buffer_slice: MutableBufferSlice<'_>,
783        device_offset: u64,
784        opts: ReadOptions,
785        trace_flow_id: u64,
786    ) -> Result<(), zx::Status> {
787        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
788    }
789
790    async fn write_at_with_opts_traced(
791        &self,
792        buffer_slice: BufferSlice<'_>,
793        device_offset: u64,
794        opts: WriteOptions,
795        trace_flow_id: u64,
796    ) -> Result<(), zx::Status> {
797        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
798    }
799
800    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
801        self.common.trim(range, trace_flow_id).await
802    }
803
804    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
805        self.common.flush(trace_flow_id).await
806    }
807
808    fn barrier(&self) {
809        self.common.barrier()
810    }
811
812    async fn close(&self) -> Result<(), zx::Status> {
813        let () =
814            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
815        Ok(())
816    }
817
818    fn block_size(&self) -> u32 {
819        self.common.block_size()
820    }
821
822    fn block_count(&self) -> u64 {
823        self.common.block_count()
824    }
825
826    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
827        self.common.max_transfer_blocks()
828    }
829
830    fn block_flags(&self) -> BlockFlags {
831        self.common.block_flags()
832    }
833
834    fn is_connected(&self) -> bool {
835        self.common.is_connected()
836    }
837}
838
839pub struct RemoteBlockClientSync {
840    session: block::SessionSynchronousProxy,
841    common: Common,
842}
843
844impl RemoteBlockClientSync {
845    /// Returns a connection to a remote block device via the given channel, but spawns a separate
846    /// thread for polling the fifo which makes it work in cases where no executor is configured for
847    /// the calling thread.
848    pub fn new(
849        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
850    ) -> Result<Self, zx::Status> {
851        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
852        let info = remote
853            .get_info(zx::MonotonicInstant::INFINITE)
854            .map_err(fidl_to_status)?
855            .map_err(zx::Status::from_raw)?;
856        let (client, server) = fidl::endpoints::create_endpoints();
857        let () = remote.open_session(server).map_err(fidl_to_status)?;
858        let session = block::SessionSynchronousProxy::new(client.into_channel());
859        let fifo = session
860            .get_fifo(zx::MonotonicInstant::INFINITE)
861            .map_err(fidl_to_status)?
862            .map_err(zx::Status::from_raw)?;
863        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
864        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
865        let vmo_id = session
866            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
867            .map_err(fidl_to_status)?
868            .map_err(zx::Status::from_raw)?;
869        let vmo_id = VmoId::new(vmo_id.id);
870
871        // The fifo needs to be instantiated from the thread that has the executor as that's where
872        // the fifo registers for notifications to be delivered.
873        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
874        std::thread::spawn(move || {
875            let mut executor = fasync::LocalExecutor::new();
876            let fifo = fasync::Fifo::from_fifo(fifo);
877            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
878            let fifo_state = common.fifo_state.clone();
879            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
880            executor.run_singlethreaded(FifoPoller { fifo_state });
881        });
882        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
883    }
884
885    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
886        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
887        let vmo_id = self
888            .session
889            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
890            .map_err(fidl_to_status)?
891            .map_err(zx::Status::from_raw)?;
892        Ok(VmoId::new(vmo_id.id))
893    }
894
895    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
896        block_on(self.common.detach_vmo(vmo_id))
897    }
898
899    pub fn read_at(
900        &self,
901        buffer_slice: MutableBufferSlice<'_>,
902        device_offset: u64,
903    ) -> Result<(), zx::Status> {
904        block_on(self.common.read_at(
905            buffer_slice,
906            device_offset,
907            ReadOptions::default(),
908            NO_TRACE_ID,
909        ))
910    }
911
912    pub fn write_at(
913        &self,
914        buffer_slice: BufferSlice<'_>,
915        device_offset: u64,
916    ) -> Result<(), zx::Status> {
917        block_on(self.common.write_at(
918            buffer_slice,
919            device_offset,
920            WriteOptions::default(),
921            NO_TRACE_ID,
922        ))
923    }
924
925    pub fn flush(&self) -> Result<(), zx::Status> {
926        block_on(self.common.flush(NO_TRACE_ID))
927    }
928
929    pub fn close(&self) -> Result<(), zx::Status> {
930        let () = self
931            .session
932            .close(zx::MonotonicInstant::INFINITE)
933            .map_err(fidl_to_status)?
934            .map_err(zx::Status::from_raw)?;
935        Ok(())
936    }
937
938    pub fn block_size(&self) -> u32 {
939        self.common.block_size()
940    }
941
942    pub fn block_count(&self) -> u64 {
943        self.common.block_count()
944    }
945
946    pub fn is_connected(&self) -> bool {
947        self.common.is_connected()
948    }
949}
950
951impl Drop for RemoteBlockClientSync {
952    fn drop(&mut self) {
953        // Ignore errors here as there is not much we can do about it.
954        let _ = self.close();
955    }
956}
957
958// FifoPoller is a future responsible for sending and receiving from the fifo.
959struct FifoPoller {
960    fifo_state: FifoStateRef,
961}
962
963impl Future for FifoPoller {
964    type Output = ();
965
966    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
967        let mut state_lock = self.fifo_state.lock();
968        let state = state_lock.deref_mut(); // So that we can split the borrow.
969
970        // Send requests.
971        if state.poll_send_requests(context) {
972            return Poll::Ready(());
973        }
974
975        // Receive responses.
976        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
977        loop {
978            let mut response = MaybeUninit::uninit();
979            match fifo.try_read(context, &mut response) {
980                Poll::Pending => {
981                    state.poller_waker = Some(context.waker().clone());
982                    return Poll::Pending;
983                }
984                Poll::Ready(Ok(_)) => {
985                    let response = unsafe { response.assume_init() };
986                    let request_id = response.reqid;
987                    // If the request isn't in the map, assume that it's a cancelled read.
988                    if let Some(request_state) = state.map.get_mut(&request_id) {
989                        request_state.result.replace(zx::Status::from_raw(response.status));
990                        if let Some(waker) = request_state.waker.take() {
991                            waker.wake();
992                        }
993                    }
994                }
995                Poll::Ready(Err(_)) => {
996                    state.terminate();
997                    return Poll::Ready(());
998                }
999            }
1000        }
1001    }
1002}
1003
1004#[cfg(test)]
1005mod tests {
1006    use super::{
1007        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
1008        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
1009    };
1010    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
1011    use fidl::endpoints::RequestStream as _;
1012    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
1013    use futures::join;
1014    use futures::stream::StreamExt as _;
1015    use futures::stream::futures_unordered::FuturesUnordered;
1016    use ramdevice_client::RamdiskClient;
1017    use std::borrow::Cow;
1018    use std::num::NonZero;
1019    use std::sync::Arc;
1020    use std::sync::atomic::{AtomicBool, Ordering};
1021    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
1022
1023    const RAMDISK_BLOCK_SIZE: u64 = 1024;
1024    const RAMDISK_BLOCK_COUNT: u64 = 1024;
1025
1026    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1027        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1028            .await
1029            .expect("RamdiskClient::create failed");
1030        let client_end = ramdisk.open().expect("ramdisk.open failed");
1031        let proxy = client_end.into_proxy();
1032        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1033        assert_eq!(block_client.block_size(), 1024);
1034        let client_end = ramdisk.open().expect("ramdisk.open failed");
1035        let proxy = client_end.into_proxy();
1036        (ramdisk, proxy, block_client)
1037    }
1038
1039    #[fuchsia::test]
1040    async fn test_against_ram_disk() {
1041        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1042
1043        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1044        vmo.write(b"hello", 5).expect("vmo.write failed");
1045        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1046        block_client
1047            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1048            .await
1049            .expect("write_at failed");
1050        block_client
1051            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1052            .await
1053            .expect("read_at failed");
1054        let mut buf: [u8; 5] = Default::default();
1055        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1056        assert_eq!(&buf, b"hello");
1057        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1058    }
1059
1060    #[fuchsia::test]
1061    async fn test_alignment() {
1062        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1063        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1064        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1065        block_client
1066            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1067            .await
1068            .expect_err("expected failure due to bad alignment");
1069        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1070    }
1071
1072    #[fuchsia::test]
1073    async fn test_parallel_io() {
1074        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1075        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1076        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1077        let mut reads = Vec::new();
1078        for _ in 0..1024 {
1079            reads.push(
1080                block_client
1081                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1082                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1083            );
1084        }
1085        futures::future::join_all(reads).await;
1086        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1087    }
1088
1089    #[fuchsia::test]
1090    async fn test_closed_device() {
1091        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1092        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1093        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1094        let mut reads = Vec::new();
1095        for _ in 0..1024 {
1096            reads.push(
1097                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1098            );
1099        }
1100        assert!(block_client.is_connected());
1101        let _ = futures::join!(futures::future::join_all(reads), async {
1102            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1103        });
1104        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1105        while block_client
1106            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1107            .await
1108            .is_ok()
1109        {}
1110
1111        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1112        // get false-positives from is_connected.
1113        while block_client.is_connected() {
1114            // Sleep for a bit to minimise lock contention.
1115            fasync::Timer::new(fasync::MonotonicInstant::after(
1116                zx::MonotonicDuration::from_millis(500),
1117            ))
1118            .await;
1119        }
1120
1121        // But once is_connected goes negative, it should stay negative.
1122        assert_eq!(block_client.is_connected(), false);
1123        let _ = block_client.detach_vmo(vmo_id).await;
1124    }
1125
1126    #[fuchsia::test]
1127    async fn test_cancelled_reads() {
1128        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1129        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1130        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1131        {
1132            let mut reads = FuturesUnordered::new();
1133            for _ in 0..1024 {
1134                reads.push(
1135                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1136                );
1137            }
1138            // Read the first 500 results and then dump the rest.
1139            for _ in 0..500 {
1140                reads.next().await;
1141            }
1142        }
1143        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1144    }
1145
1146    #[fuchsia::test]
1147    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1148        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1149        let block_client_ref = &block_client;
1150        let test_one = |offset, len, fill| async move {
1151            let buf = vec![fill; len];
1152            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1153            // Read back an extra block either side.
1154            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1155            block_client_ref
1156                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1157                .await
1158                .expect("read_at failed");
1159            assert_eq!(
1160                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1161                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1162            );
1163            assert_eq!(
1164                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1165                &buf[..]
1166            );
1167            assert_eq!(
1168                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1169                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1170            );
1171        };
1172        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1173        join!(
1174            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1175            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1176        );
1177    }
1178
1179    // Implements dummy server which can be used by test cases to verify whether
1180    // channel messages and fifo operations are being received - by using set_channel_handler or
1181    // set_fifo_hander respectively
1182    struct FakeBlockServer<'a> {
1183        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1184        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1185        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1186    }
1187
1188    impl<'a> FakeBlockServer<'a> {
1189        // Creates a new FakeBlockServer given a channel to listen on.
1190        //
1191        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1192        // handles requests received from channel or the fifo respectfully.
1193        //
1194        // 'channel_handler' receives a message before it is handled by the default implementation
1195        // and can return 'true' to indicate all processing is done and no further processing of
1196        // that message is required
1197        //
1198        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1199        // FakeBlockServer will send over the fifo.
1200        fn new(
1201            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1202            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1203            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1204        ) -> FakeBlockServer<'a> {
1205            FakeBlockServer {
1206                server_channel: Some(server_channel),
1207                channel_handler: Box::new(channel_handler),
1208                fifo_handler: Box::new(fifo_handler),
1209            }
1210        }
1211
1212        // Runs the server
1213        async fn run(&mut self) {
1214            let server = self.server_channel.take().unwrap();
1215
1216            // Set up a mock server.
1217            let (server_fifo, client_fifo) =
1218                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1219                    .expect("Fifo::create failed");
1220            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1221
1222            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1223            let fifo_future = Abortable::new(
1224                async {
1225                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1226                    let (mut reader, mut writer) = fifo.async_io();
1227                    let mut request = BlockFifoRequest::default();
1228                    loop {
1229                        match reader.read_entries(&mut request).await {
1230                            Ok(1) => {}
1231                            Err(zx::Status::PEER_CLOSED) => break,
1232                            Err(e) => panic!("read_entry failed {:?}", e),
1233                            _ => unreachable!(),
1234                        };
1235
1236                        let response = self.fifo_handler.as_ref()(request);
1237                        writer
1238                            .write_entries(std::slice::from_ref(&response))
1239                            .await
1240                            .expect("write_entries failed");
1241                    }
1242                },
1243                fifo_future_abort_registration,
1244            );
1245
1246            let channel_future = async {
1247                server
1248                    .into_stream()
1249                    .for_each_concurrent(None, |request| async {
1250                        let request = request.expect("unexpected fidl error");
1251
1252                        match request {
1253                            block::BlockRequest::GetInfo { responder } => {
1254                                responder
1255                                    .send(Ok(&block::BlockInfo {
1256                                        block_count: 1024,
1257                                        block_size: 512,
1258                                        max_transfer_size: 1024 * 1024,
1259                                        flags: block::Flag::empty(),
1260                                    }))
1261                                    .expect("send failed");
1262                            }
1263                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1264                                let stream = session.into_stream();
1265                                stream
1266                                    .for_each(|request| async {
1267                                        let request = request.expect("unexpected fidl error");
1268                                        // Give a chance for the test to register and potentially
1269                                        // handle the event
1270                                        if self.channel_handler.as_ref()(&request) {
1271                                            return;
1272                                        }
1273                                        match request {
1274                                            block::SessionRequest::GetFifo { responder } => {
1275                                                match maybe_server_fifo.lock().take() {
1276                                                    Some(fifo) => {
1277                                                        responder.send(Ok(fifo.downcast()))
1278                                                    }
1279                                                    None => responder.send(Err(
1280                                                        zx::Status::NO_RESOURCES.into_raw(),
1281                                                    )),
1282                                                }
1283                                                .expect("send failed")
1284                                            }
1285                                            block::SessionRequest::AttachVmo {
1286                                                vmo: _,
1287                                                responder,
1288                                            } => responder
1289                                                .send(Ok(&block::VmoId { id: 1 }))
1290                                                .expect("send failed"),
1291                                            block::SessionRequest::Close { responder } => {
1292                                                fifo_future_abort.abort();
1293                                                responder.send(Ok(())).expect("send failed")
1294                                            }
1295                                        }
1296                                    })
1297                                    .await
1298                            }
1299                            _ => panic!("Unexpected message"),
1300                        }
1301                    })
1302                    .await;
1303            };
1304
1305            let _result = join!(fifo_future, channel_future);
1306            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1307        }
1308    }
1309
1310    #[fuchsia::test]
1311    async fn test_block_close_is_called() {
1312        let close_called = fuchsia_sync::Mutex::new(false);
1313        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1314
1315        std::thread::spawn(move || {
1316            let _block_client =
1317                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1318            // The drop here should cause Close to be sent.
1319        });
1320
1321        let channel_handler = |request: &block::SessionRequest| -> bool {
1322            if let block::SessionRequest::Close { .. } = request {
1323                *close_called.lock() = true;
1324            }
1325            false
1326        };
1327        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1328
1329        // After the server has finished running, we can check to see that close was called.
1330        assert!(*close_called.lock());
1331    }
1332
1333    #[fuchsia::test]
1334    async fn test_block_flush_is_called() {
1335        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1336
1337        struct Interface {
1338            flush_called: Arc<AtomicBool>,
1339        }
1340        impl block_server::async_interface::Interface for Interface {
1341            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1342                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1343                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1344                    max_transfer_blocks: None,
1345                    block_range: Some(0..1000),
1346                    type_guid: [0; 16],
1347                    instance_guid: [0; 16],
1348                    name: "foo".to_string(),
1349                    flags: 0,
1350                })))
1351            }
1352
1353            async fn read(
1354                &self,
1355                _device_block_offset: u64,
1356                _block_count: u32,
1357                _vmo: &Arc<zx::Vmo>,
1358                _vmo_offset: u64,
1359                _trace_flow_id: Option<NonZero<u64>>,
1360            ) -> Result<(), zx::Status> {
1361                unreachable!();
1362            }
1363
1364            fn barrier(&self) -> Result<(), zx::Status> {
1365                unreachable!()
1366            }
1367
1368            async fn write(
1369                &self,
1370                _device_block_offset: u64,
1371                _block_count: u32,
1372                _vmo: &Arc<zx::Vmo>,
1373                _vmo_offset: u64,
1374                _write_opts: WriteOptions,
1375                _trace_flow_id: Option<NonZero<u64>>,
1376            ) -> Result<(), zx::Status> {
1377                unreachable!();
1378            }
1379
1380            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1381                self.flush_called.store(true, Ordering::Relaxed);
1382                Ok(())
1383            }
1384
1385            async fn trim(
1386                &self,
1387                _device_block_offset: u64,
1388                _block_count: u32,
1389                _trace_flow_id: Option<NonZero<u64>>,
1390            ) -> Result<(), zx::Status> {
1391                unreachable!();
1392            }
1393        }
1394
1395        let flush_called = Arc::new(AtomicBool::new(false));
1396
1397        futures::join!(
1398            async {
1399                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1400
1401                block_client.flush().await.expect("flush failed");
1402            },
1403            async {
1404                let block_server = BlockServer::new(
1405                    512,
1406                    Arc::new(Interface { flush_called: flush_called.clone() }),
1407                );
1408                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1409            }
1410        );
1411
1412        assert!(flush_called.load(Ordering::Relaxed));
1413    }
1414
1415    #[fuchsia::test]
1416    async fn test_trace_flow_ids_set() {
1417        let (proxy, server) = fidl::endpoints::create_proxy();
1418
1419        futures::join!(
1420            async {
1421                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1422                block_client.flush().await.expect("flush failed");
1423            },
1424            async {
1425                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1426                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1427                    if request.trace_flow_id > 0 {
1428                        *flow_id.lock() = Some(request.trace_flow_id);
1429                    }
1430                    BlockFifoResponse {
1431                        status: zx::Status::OK.into_raw(),
1432                        reqid: request.reqid,
1433                        ..Default::default()
1434                    }
1435                };
1436                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1437                // After the server has finished running, verify the trace flow ID was set to some value.
1438                assert!(flow_id.lock().is_some());
1439            }
1440        );
1441    }
1442}