block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5//! A Rust client library for the Fuchsia block protocol.
6//!
7//! This crate provides a low-level client for interacting with block devices over the
8//! `fuchsia.hardware.block` FIDL protocol and the FIFO interface for block I/O.
9//!
10//! See the [`BlockClient`] trait.
11
12use fidl::endpoints::ServerEnd;
13use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
14use fidl_fuchsia_hardware_block_partition::PartitionProxy;
15use fidl_fuchsia_hardware_block_volume::VolumeProxy;
16use fuchsia_sync::Mutex;
17use futures::channel::oneshot;
18use futures::executor::block_on;
19use std::collections::HashMap;
20use std::future::Future;
21use std::hash::{Hash, Hasher};
22use std::mem::MaybeUninit;
23use std::num::NonZero;
24use std::ops::{DerefMut, Range};
25use std::pin::Pin;
26use std::sync::atomic::{AtomicU16, Ordering};
27use std::sync::{Arc, LazyLock};
28use std::task::{Context, Poll, Waker};
29use zx::sys::zx_handle_t;
30use zx::{self as zx, HandleBased as _};
31use {
32    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
33    fuchsia_async as fasync, storage_trace as trace,
34};
35
36pub use cache::Cache;
37
38pub use block::Flag as BlockFlags;
39
40pub use block_protocol::*;
41
42pub mod cache;
43
44const TEMP_VMO_SIZE: usize = 65536;
45
46/// If a trace flow ID isn't specified for requests, one will be generated.
47pub const NO_TRACE_ID: u64 = 0;
48
49pub use block_driver::{BlockIoFlag, BlockOpcode};
50
51fn fidl_to_status(error: fidl::Error) -> zx::Status {
52    match error {
53        fidl::Error::ClientChannelClosed { status, .. } => status,
54        _ => zx::Status::INTERNAL,
55    }
56}
57
58fn opcode_str(opcode: u8) -> &'static str {
59    match BlockOpcode::from_primitive(opcode) {
60        Some(BlockOpcode::Read) => "read",
61        Some(BlockOpcode::Write) => "write",
62        Some(BlockOpcode::Flush) => "flush",
63        Some(BlockOpcode::Trim) => "trim",
64        Some(BlockOpcode::CloseVmo) => "close_vmo",
65        None => "unknown",
66    }
67}
68
69// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
70// reused within this process).
71fn generate_trace_flow_id(request_id: u32) -> u64 {
72    static SELF_HANDLE: LazyLock<zx_handle_t> =
73        LazyLock::new(|| fuchsia_runtime::process_self().raw_handle());
74    *SELF_HANDLE as u64 + (request_id as u64) << 32
75}
76
77pub enum BufferSlice<'a> {
78    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
79    Memory(&'a [u8]),
80}
81
82impl<'a> BufferSlice<'a> {
83    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
84        BufferSlice::VmoId { vmo_id, offset, length }
85    }
86}
87
88impl<'a> From<&'a [u8]> for BufferSlice<'a> {
89    fn from(buf: &'a [u8]) -> Self {
90        BufferSlice::Memory(buf)
91    }
92}
93
94pub enum MutableBufferSlice<'a> {
95    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
96    Memory(&'a mut [u8]),
97}
98
99impl<'a> MutableBufferSlice<'a> {
100    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
101        MutableBufferSlice::VmoId { vmo_id, offset, length }
102    }
103}
104
105impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
106    fn from(buf: &'a mut [u8]) -> Self {
107        MutableBufferSlice::Memory(buf)
108    }
109}
110
111#[derive(Default)]
112struct RequestState {
113    result: Option<zx::Status>,
114    waker: Option<Waker>,
115}
116
117#[derive(Default)]
118struct FifoState {
119    // The fifo.
120    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
121
122    // The next request ID to be used.
123    next_request_id: u32,
124
125    // A queue of messages to be sent on the fifo.
126    queue: std::collections::VecDeque<BlockFifoRequest>,
127
128    // Map from request ID to RequestState.
129    map: HashMap<u32, RequestState>,
130
131    // The waker for the FifoPoller.
132    poller_waker: Option<Waker>,
133
134    // If set, attach a barrier to the next write request
135    attach_barrier: bool,
136}
137
138impl FifoState {
139    fn terminate(&mut self) {
140        self.fifo.take();
141        for (_, request_state) in self.map.iter_mut() {
142            request_state.result.get_or_insert(zx::Status::CANCELED);
143            if let Some(waker) = request_state.waker.take() {
144                waker.wake();
145            }
146        }
147        if let Some(waker) = self.poller_waker.take() {
148            waker.wake();
149        }
150    }
151
152    // Returns true if polling should be terminated.
153    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
154        let fifo = if let Some(fifo) = self.fifo.as_ref() {
155            fifo
156        } else {
157            return true;
158        };
159
160        loop {
161            let slice = self.queue.as_slices().0;
162            if slice.is_empty() {
163                return false;
164            }
165            match fifo.try_write(context, slice) {
166                Poll::Ready(Ok(sent)) => {
167                    self.queue.drain(0..sent);
168                }
169                Poll::Ready(Err(_)) => {
170                    self.terminate();
171                    return true;
172                }
173                Poll::Pending => {
174                    return false;
175                }
176            }
177        }
178    }
179}
180
181type FifoStateRef = Arc<Mutex<FifoState>>;
182
183// A future used for fifo responses.
184struct ResponseFuture {
185    request_id: u32,
186    fifo_state: FifoStateRef,
187}
188
189impl ResponseFuture {
190    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
191        ResponseFuture { request_id, fifo_state }
192    }
193}
194
195impl Future for ResponseFuture {
196    type Output = Result<(), zx::Status>;
197
198    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
199        let mut state = self.fifo_state.lock();
200        let request_state = state.map.get_mut(&self.request_id).unwrap();
201        if let Some(result) = request_state.result {
202            Poll::Ready(result.into())
203        } else {
204            request_state.waker.replace(context.waker().clone());
205            Poll::Pending
206        }
207    }
208}
209
210impl Drop for ResponseFuture {
211    fn drop(&mut self) {
212        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
213    }
214}
215
216/// Wraps a vmo-id. Will panic if you forget to detach.
217#[derive(Debug)]
218#[must_use]
219pub struct VmoId(AtomicU16);
220
221impl VmoId {
222    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
223    pub fn new(id: u16) -> Self {
224        Self(AtomicU16::new(id))
225    }
226
227    /// Invalidates self and returns a new VmoId with the same underlying ID.
228    pub fn take(&self) -> Self {
229        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
230    }
231
232    pub fn is_valid(&self) -> bool {
233        self.id() != block_driver::BLOCK_VMOID_INVALID
234    }
235
236    /// Takes the ID.  The caller assumes responsibility for detaching.
237    #[must_use]
238    pub fn into_id(self) -> u16 {
239        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
240    }
241
242    pub fn id(&self) -> u16 {
243        self.0.load(Ordering::Relaxed)
244    }
245}
246
247impl PartialEq for VmoId {
248    fn eq(&self, other: &Self) -> bool {
249        self.id() == other.id()
250    }
251}
252
253impl Eq for VmoId {}
254
255impl Drop for VmoId {
256    fn drop(&mut self) {
257        assert_eq!(
258            self.0.load(Ordering::Relaxed),
259            block_driver::BLOCK_VMOID_INVALID,
260            "Did you forget to detach?"
261        );
262    }
263}
264
265impl Hash for VmoId {
266    fn hash<H: Hasher>(&self, state: &mut H) {
267        self.id().hash(state);
268    }
269}
270
271/// Represents a client connection to a block device. This is a simplified version of the block.fidl
272/// interface.
273/// Most users will use the RemoteBlockClient instantiation of this trait.
274pub trait BlockClient: Send + Sync {
275    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
276    fn attach_vmo(&self, vmo: &zx::Vmo) -> impl Future<Output = Result<VmoId, zx::Status>> + Send;
277
278    /// Detaches the given vmo-id from the device.
279    fn detach_vmo(&self, vmo_id: VmoId) -> impl Future<Output = Result<(), zx::Status>> + Send;
280
281    /// Reads from the device at |device_offset| into the given buffer slice.
282    fn read_at(
283        &self,
284        buffer_slice: MutableBufferSlice<'_>,
285        device_offset: u64,
286    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
287        self.read_at_with_opts_traced(buffer_slice, device_offset, ReadOptions::default(), 0)
288    }
289
290    fn read_at_with_opts(
291        &self,
292        buffer_slice: MutableBufferSlice<'_>,
293        device_offset: u64,
294        opts: ReadOptions,
295    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
296        self.read_at_with_opts_traced(buffer_slice, device_offset, opts, 0)
297    }
298
299    fn read_at_with_opts_traced(
300        &self,
301        buffer_slice: MutableBufferSlice<'_>,
302        device_offset: u64,
303        opts: ReadOptions,
304        trace_flow_id: u64,
305    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
306
307    /// Writes the data in |buffer_slice| to the device.
308    fn write_at(
309        &self,
310        buffer_slice: BufferSlice<'_>,
311        device_offset: u64,
312    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
313        self.write_at_with_opts_traced(
314            buffer_slice,
315            device_offset,
316            WriteOptions::default(),
317            NO_TRACE_ID,
318        )
319    }
320
321    fn write_at_with_opts(
322        &self,
323        buffer_slice: BufferSlice<'_>,
324        device_offset: u64,
325        opts: WriteOptions,
326    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
327        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID)
328    }
329
330    fn write_at_with_opts_traced(
331        &self,
332        buffer_slice: BufferSlice<'_>,
333        device_offset: u64,
334        opts: WriteOptions,
335        trace_flow_id: u64,
336    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
337
338    /// Trims the given range on the block device.
339    fn trim(
340        &self,
341        device_range: Range<u64>,
342    ) -> impl Future<Output = Result<(), zx::Status>> + Send {
343        self.trim_traced(device_range, NO_TRACE_ID)
344    }
345
346    fn trim_traced(
347        &self,
348        device_range: Range<u64>,
349        trace_flow_id: u64,
350    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
351
352    /// Attaches a barrier to the next write sent to the underlying block device. This barrier
353    /// method is an alternative to setting the WriteOption::PRE_BARRIER on `write_at_with_opts`.
354    /// This method makes it easier to guarantee that the barrier is attached to the correct
355    /// write operation when subsequent write operations can get reordered.
356    fn barrier(&self);
357
358    fn flush(&self) -> impl Future<Output = Result<(), zx::Status>> + Send {
359        self.flush_traced(NO_TRACE_ID)
360    }
361
362    /// Sends a flush request to the underlying block device.
363    fn flush_traced(
364        &self,
365        trace_flow_id: u64,
366    ) -> impl Future<Output = Result<(), zx::Status>> + Send;
367
368    /// Closes the fifo.
369    fn close(&self) -> impl Future<Output = Result<(), zx::Status>> + Send;
370
371    /// Returns the block size of the device.
372    fn block_size(&self) -> u32;
373
374    /// Returns the size, in blocks, of the device.
375    fn block_count(&self) -> u64;
376
377    /// Returns the maximum number of blocks which can be transferred in a single request.
378    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
379
380    /// Returns the block flags reported by the device.
381    fn block_flags(&self) -> BlockFlags;
382
383    /// Returns true if the remote fifo is still connected.
384    fn is_connected(&self) -> bool;
385}
386
387struct Common {
388    block_size: u32,
389    block_count: u64,
390    max_transfer_blocks: Option<NonZero<u32>>,
391    block_flags: BlockFlags,
392    fifo_state: FifoStateRef,
393    temp_vmo: futures::lock::Mutex<zx::Vmo>,
394    temp_vmo_id: VmoId,
395}
396
397impl Common {
398    fn new(
399        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
400        info: &block::BlockInfo,
401        temp_vmo: zx::Vmo,
402        temp_vmo_id: VmoId,
403    ) -> Self {
404        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
405        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
406        Self {
407            block_size: info.block_size,
408            block_count: info.block_count,
409            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
410                NonZero::new(info.max_transfer_size / info.block_size)
411            } else {
412                None
413            },
414            block_flags: info.flags,
415            fifo_state,
416            temp_vmo: futures::lock::Mutex::new(temp_vmo),
417            temp_vmo_id,
418        }
419    }
420
421    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
422        if bytes % self.block_size as u64 != 0 {
423            Err(zx::Status::INVALID_ARGS)
424        } else {
425            Ok(bytes / self.block_size as u64)
426        }
427    }
428
429    // Sends the request and waits for the response.
430    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
431        async move {
432            let (request_id, trace_flow_id) = {
433                let mut state = self.fifo_state.lock();
434
435                let mut flags = BlockIoFlag::from_bits_retain(request.command.flags);
436                if BlockOpcode::from_primitive(request.command.opcode) == Some(BlockOpcode::Write)
437                    && state.attach_barrier
438                {
439                    flags |= BlockIoFlag::PRE_BARRIER;
440                    request.command.flags = flags.bits();
441                    state.attach_barrier = false;
442                }
443
444                if state.fifo.is_none() {
445                    // Fifo has been closed.
446                    return Err(zx::Status::CANCELED);
447                }
448                trace::duration!(
449                    c"storage",
450                    c"block_client::send::start",
451                    "op" => opcode_str(request.command.opcode)
452                );
453                let request_id = state.next_request_id;
454                state.next_request_id = state.next_request_id.overflowing_add(1).0;
455                assert!(
456                    state.map.insert(request_id, RequestState::default()).is_none(),
457                    "request id in use!"
458                );
459                request.reqid = request_id;
460                if request.trace_flow_id == NO_TRACE_ID {
461                    request.trace_flow_id = generate_trace_flow_id(request_id);
462                }
463                let trace_flow_id = request.trace_flow_id;
464                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
465                state.queue.push_back(request);
466                if let Some(waker) = state.poller_waker.clone() {
467                    state.poll_send_requests(&mut Context::from_waker(&waker));
468                }
469                (request_id, trace_flow_id)
470            };
471            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
472            trace::duration!(c"storage", c"block_client::send::end");
473            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
474            Ok(())
475        }
476        .await
477    }
478
479    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
480        self.send(BlockFifoRequest {
481            command: BlockFifoCommand {
482                opcode: BlockOpcode::CloseVmo.into_primitive(),
483                flags: 0,
484                ..Default::default()
485            },
486            vmoid: vmo_id.into_id(),
487            ..Default::default()
488        })
489        .await
490    }
491
492    async fn read_at(
493        &self,
494        buffer_slice: MutableBufferSlice<'_>,
495        device_offset: u64,
496        opts: ReadOptions,
497        trace_flow_id: u64,
498    ) -> Result<(), zx::Status> {
499        match buffer_slice {
500            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
501                self.send(BlockFifoRequest {
502                    command: BlockFifoCommand {
503                        opcode: BlockOpcode::Read.into_primitive(),
504                        flags: 0,
505                        ..Default::default()
506                    },
507                    vmoid: vmo_id.id(),
508                    length: self
509                        .to_blocks(length)?
510                        .try_into()
511                        .map_err(|_| zx::Status::INVALID_ARGS)?,
512                    vmo_offset: self.to_blocks(offset)?,
513                    dev_offset: self.to_blocks(device_offset)?,
514                    trace_flow_id,
515                    dun: opts.inline_crypto_options.dun,
516                    slot: opts.inline_crypto_options.slot,
517                    ..Default::default()
518                })
519                .await?
520            }
521            MutableBufferSlice::Memory(mut slice) => {
522                let temp_vmo = self.temp_vmo.lock().await;
523                let mut device_block = self.to_blocks(device_offset)?;
524                loop {
525                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
526                    let block_count = self.to_blocks(to_do as u64)? as u32;
527                    self.send(BlockFifoRequest {
528                        command: BlockFifoCommand {
529                            opcode: BlockOpcode::Read.into_primitive(),
530                            flags: 0,
531                            ..Default::default()
532                        },
533                        vmoid: self.temp_vmo_id.id(),
534                        length: block_count,
535                        vmo_offset: 0,
536                        dev_offset: device_block,
537                        trace_flow_id,
538                        dun: opts.inline_crypto_options.dun,
539                        slot: opts.inline_crypto_options.slot,
540                        ..Default::default()
541                    })
542                    .await?;
543                    temp_vmo.read(&mut slice[..to_do], 0)?;
544                    if to_do == slice.len() {
545                        break;
546                    }
547                    device_block += block_count as u64;
548                    slice = &mut slice[to_do..];
549                }
550            }
551        }
552        Ok(())
553    }
554
555    async fn write_at(
556        &self,
557        buffer_slice: BufferSlice<'_>,
558        device_offset: u64,
559        opts: WriteOptions,
560        trace_flow_id: u64,
561    ) -> Result<(), zx::Status> {
562        let mut flags = BlockIoFlag::empty();
563
564        if opts.flags.contains(WriteFlags::FORCE_ACCESS) {
565            flags |= BlockIoFlag::FORCE_ACCESS;
566        }
567
568        if opts.flags.contains(WriteFlags::PRE_BARRIER) {
569            flags |= BlockIoFlag::PRE_BARRIER;
570        }
571
572        match buffer_slice {
573            BufferSlice::VmoId { vmo_id, offset, length } => {
574                self.send(BlockFifoRequest {
575                    command: BlockFifoCommand {
576                        opcode: BlockOpcode::Write.into_primitive(),
577                        flags: flags.bits(),
578                        ..Default::default()
579                    },
580                    vmoid: vmo_id.id(),
581                    length: self
582                        .to_blocks(length)?
583                        .try_into()
584                        .map_err(|_| zx::Status::INVALID_ARGS)?,
585                    vmo_offset: self.to_blocks(offset)?,
586                    dev_offset: self.to_blocks(device_offset)?,
587                    trace_flow_id,
588                    dun: opts.inline_crypto_options.dun,
589                    slot: opts.inline_crypto_options.slot,
590                    ..Default::default()
591                })
592                .await?;
593            }
594            BufferSlice::Memory(mut slice) => {
595                let temp_vmo = self.temp_vmo.lock().await;
596                let mut device_block = self.to_blocks(device_offset)?;
597                loop {
598                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
599                    let block_count = self.to_blocks(to_do as u64)? as u32;
600                    temp_vmo.write(&slice[..to_do], 0)?;
601                    self.send(BlockFifoRequest {
602                        command: BlockFifoCommand {
603                            opcode: BlockOpcode::Write.into_primitive(),
604                            flags: flags.bits(),
605                            ..Default::default()
606                        },
607                        vmoid: self.temp_vmo_id.id(),
608                        length: block_count,
609                        vmo_offset: 0,
610                        dev_offset: device_block,
611                        trace_flow_id,
612                        dun: opts.inline_crypto_options.dun,
613                        slot: opts.inline_crypto_options.slot,
614                        ..Default::default()
615                    })
616                    .await?;
617                    if to_do == slice.len() {
618                        break;
619                    }
620                    device_block += block_count as u64;
621                    slice = &slice[to_do..];
622                }
623            }
624        }
625        Ok(())
626    }
627
628    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
629        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
630        let dev_offset = self.to_blocks(device_range.start)?;
631        self.send(BlockFifoRequest {
632            command: BlockFifoCommand {
633                opcode: BlockOpcode::Trim.into_primitive(),
634                flags: 0,
635                ..Default::default()
636            },
637            vmoid: block_driver::BLOCK_VMOID_INVALID,
638            length,
639            dev_offset,
640            trace_flow_id,
641            ..Default::default()
642        })
643        .await
644    }
645
646    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
647        self.send(BlockFifoRequest {
648            command: BlockFifoCommand {
649                opcode: BlockOpcode::Flush.into_primitive(),
650                flags: 0,
651                ..Default::default()
652            },
653            vmoid: block_driver::BLOCK_VMOID_INVALID,
654            trace_flow_id,
655            ..Default::default()
656        })
657        .await
658    }
659
660    fn barrier(&self) {
661        self.fifo_state.lock().attach_barrier = true;
662    }
663
664    fn block_size(&self) -> u32 {
665        self.block_size
666    }
667
668    fn block_count(&self) -> u64 {
669        self.block_count
670    }
671
672    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
673        self.max_transfer_blocks.clone()
674    }
675
676    fn block_flags(&self) -> BlockFlags {
677        self.block_flags
678    }
679
680    fn is_connected(&self) -> bool {
681        self.fifo_state.lock().fifo.is_some()
682    }
683}
684
685impl Drop for Common {
686    fn drop(&mut self) {
687        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
688        // down.
689        let _ = self.temp_vmo_id.take().into_id();
690        self.fifo_state.lock().terminate();
691    }
692}
693
694/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
695pub struct RemoteBlockClient {
696    session: block::SessionProxy,
697    common: Common,
698}
699
700pub trait AsBlockProxy {
701    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
702
703    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
704}
705
706impl<T: AsBlockProxy> AsBlockProxy for &T {
707    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
708        AsBlockProxy::get_info(*self)
709    }
710    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
711        AsBlockProxy::open_session(*self, session)
712    }
713}
714
715macro_rules! impl_as_block_proxy {
716    ($name:ident) => {
717        impl AsBlockProxy for $name {
718            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
719                $name::get_info(self).await
720            }
721
722            fn open_session(
723                &self,
724                session: ServerEnd<block::SessionMarker>,
725            ) -> Result<(), fidl::Error> {
726                $name::open_session(self, session)
727            }
728        }
729    };
730}
731
732impl_as_block_proxy!(BlockProxy);
733impl_as_block_proxy!(PartitionProxy);
734impl_as_block_proxy!(VolumeProxy);
735
736impl RemoteBlockClient {
737    /// Returns a connection to a remote block device via the given channel.
738    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
739        let info =
740            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
741        let (session, server) = fidl::endpoints::create_proxy();
742        let () = remote.open_session(server).map_err(fidl_to_status)?;
743        Self::from_session(info, session).await
744    }
745
746    pub async fn from_session(
747        info: block::BlockInfo,
748        session: block::SessionProxy,
749    ) -> Result<Self, zx::Status> {
750        let fifo =
751            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
752        let fifo = fasync::Fifo::from_fifo(fifo);
753        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
754        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
755        let vmo_id =
756            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
757        let vmo_id = VmoId::new(vmo_id.id);
758        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
759    }
760}
761
762impl BlockClient for RemoteBlockClient {
763    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
764        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
765        let vmo_id = self
766            .session
767            .attach_vmo(dup)
768            .await
769            .map_err(fidl_to_status)?
770            .map_err(zx::Status::from_raw)?;
771        Ok(VmoId::new(vmo_id.id))
772    }
773
774    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
775        self.common.detach_vmo(vmo_id).await
776    }
777
778    async fn read_at_with_opts_traced(
779        &self,
780        buffer_slice: MutableBufferSlice<'_>,
781        device_offset: u64,
782        opts: ReadOptions,
783        trace_flow_id: u64,
784    ) -> Result<(), zx::Status> {
785        self.common.read_at(buffer_slice, device_offset, opts, trace_flow_id).await
786    }
787
788    async fn write_at_with_opts_traced(
789        &self,
790        buffer_slice: BufferSlice<'_>,
791        device_offset: u64,
792        opts: WriteOptions,
793        trace_flow_id: u64,
794    ) -> Result<(), zx::Status> {
795        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
796    }
797
798    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
799        self.common.trim(range, trace_flow_id).await
800    }
801
802    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
803        self.common.flush(trace_flow_id).await
804    }
805
806    fn barrier(&self) {
807        self.common.barrier()
808    }
809
810    async fn close(&self) -> Result<(), zx::Status> {
811        let () =
812            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
813        Ok(())
814    }
815
816    fn block_size(&self) -> u32 {
817        self.common.block_size()
818    }
819
820    fn block_count(&self) -> u64 {
821        self.common.block_count()
822    }
823
824    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
825        self.common.max_transfer_blocks()
826    }
827
828    fn block_flags(&self) -> BlockFlags {
829        self.common.block_flags()
830    }
831
832    fn is_connected(&self) -> bool {
833        self.common.is_connected()
834    }
835}
836
837pub struct RemoteBlockClientSync {
838    session: block::SessionSynchronousProxy,
839    common: Common,
840}
841
842impl RemoteBlockClientSync {
843    /// Returns a connection to a remote block device via the given channel, but spawns a separate
844    /// thread for polling the fifo which makes it work in cases where no executor is configured for
845    /// the calling thread.
846    pub fn new(
847        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
848    ) -> Result<Self, zx::Status> {
849        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
850        let info = remote
851            .get_info(zx::MonotonicInstant::INFINITE)
852            .map_err(fidl_to_status)?
853            .map_err(zx::Status::from_raw)?;
854        let (client, server) = fidl::endpoints::create_endpoints();
855        let () = remote.open_session(server).map_err(fidl_to_status)?;
856        let session = block::SessionSynchronousProxy::new(client.into_channel());
857        let fifo = session
858            .get_fifo(zx::MonotonicInstant::INFINITE)
859            .map_err(fidl_to_status)?
860            .map_err(zx::Status::from_raw)?;
861        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
862        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
863        let vmo_id = session
864            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
865            .map_err(fidl_to_status)?
866            .map_err(zx::Status::from_raw)?;
867        let vmo_id = VmoId::new(vmo_id.id);
868
869        // The fifo needs to be instantiated from the thread that has the executor as that's where
870        // the fifo registers for notifications to be delivered.
871        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
872        std::thread::spawn(move || {
873            let mut executor = fasync::LocalExecutor::default();
874            let fifo = fasync::Fifo::from_fifo(fifo);
875            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
876            let fifo_state = common.fifo_state.clone();
877            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
878            executor.run_singlethreaded(FifoPoller { fifo_state });
879        });
880        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
881    }
882
883    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
884        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
885        let vmo_id = self
886            .session
887            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
888            .map_err(fidl_to_status)?
889            .map_err(zx::Status::from_raw)?;
890        Ok(VmoId::new(vmo_id.id))
891    }
892
893    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
894        block_on(self.common.detach_vmo(vmo_id))
895    }
896
897    pub fn read_at(
898        &self,
899        buffer_slice: MutableBufferSlice<'_>,
900        device_offset: u64,
901    ) -> Result<(), zx::Status> {
902        block_on(self.common.read_at(
903            buffer_slice,
904            device_offset,
905            ReadOptions::default(),
906            NO_TRACE_ID,
907        ))
908    }
909
910    pub fn write_at(
911        &self,
912        buffer_slice: BufferSlice<'_>,
913        device_offset: u64,
914    ) -> Result<(), zx::Status> {
915        block_on(self.common.write_at(
916            buffer_slice,
917            device_offset,
918            WriteOptions::default(),
919            NO_TRACE_ID,
920        ))
921    }
922
923    pub fn flush(&self) -> Result<(), zx::Status> {
924        block_on(self.common.flush(NO_TRACE_ID))
925    }
926
927    pub fn close(&self) -> Result<(), zx::Status> {
928        let () = self
929            .session
930            .close(zx::MonotonicInstant::INFINITE)
931            .map_err(fidl_to_status)?
932            .map_err(zx::Status::from_raw)?;
933        Ok(())
934    }
935
936    pub fn block_size(&self) -> u32 {
937        self.common.block_size()
938    }
939
940    pub fn block_count(&self) -> u64 {
941        self.common.block_count()
942    }
943
944    pub fn is_connected(&self) -> bool {
945        self.common.is_connected()
946    }
947}
948
949impl Drop for RemoteBlockClientSync {
950    fn drop(&mut self) {
951        // Ignore errors here as there is not much we can do about it.
952        let _ = self.close();
953    }
954}
955
956// FifoPoller is a future responsible for sending and receiving from the fifo.
957struct FifoPoller {
958    fifo_state: FifoStateRef,
959}
960
961impl Future for FifoPoller {
962    type Output = ();
963
964    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
965        let mut state_lock = self.fifo_state.lock();
966        let state = state_lock.deref_mut(); // So that we can split the borrow.
967
968        // Send requests.
969        if state.poll_send_requests(context) {
970            return Poll::Ready(());
971        }
972
973        // Receive responses.
974        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
975        loop {
976            let mut response = MaybeUninit::uninit();
977            match fifo.try_read(context, &mut response) {
978                Poll::Pending => {
979                    state.poller_waker = Some(context.waker().clone());
980                    return Poll::Pending;
981                }
982                Poll::Ready(Ok(_)) => {
983                    let response = unsafe { response.assume_init() };
984                    let request_id = response.reqid;
985                    // If the request isn't in the map, assume that it's a cancelled read.
986                    if let Some(request_state) = state.map.get_mut(&request_id) {
987                        request_state.result.replace(zx::Status::from_raw(response.status));
988                        if let Some(waker) = request_state.waker.take() {
989                            waker.wake();
990                        }
991                    }
992                }
993                Poll::Ready(Err(_)) => {
994                    state.terminate();
995                    return Poll::Ready(());
996                }
997            }
998        }
999    }
1000}
1001
1002#[cfg(test)]
1003mod tests {
1004    use super::{
1005        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
1006        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
1007    };
1008    use block_protocol::ReadOptions;
1009    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
1010    use fidl::endpoints::RequestStream as _;
1011    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
1012    use futures::join;
1013    use futures::stream::StreamExt as _;
1014    use futures::stream::futures_unordered::FuturesUnordered;
1015    use ramdevice_client::RamdiskClient;
1016    use std::borrow::Cow;
1017    use std::num::NonZero;
1018    use std::sync::Arc;
1019    use std::sync::atomic::{AtomicBool, Ordering};
1020    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
1021
1022    const RAMDISK_BLOCK_SIZE: u64 = 1024;
1023    const RAMDISK_BLOCK_COUNT: u64 = 1024;
1024
1025    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
1026        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
1027            .await
1028            .expect("RamdiskClient::create failed");
1029        let client_end = ramdisk.open().expect("ramdisk.open failed");
1030        let proxy = client_end.into_proxy();
1031        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1032        assert_eq!(block_client.block_size(), 1024);
1033        let client_end = ramdisk.open().expect("ramdisk.open failed");
1034        let proxy = client_end.into_proxy();
1035        (ramdisk, proxy, block_client)
1036    }
1037
1038    #[fuchsia::test]
1039    async fn test_against_ram_disk() {
1040        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1041
1042        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1043        vmo.write(b"hello", 5).expect("vmo.write failed");
1044        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1045        block_client
1046            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1047            .await
1048            .expect("write_at failed");
1049        block_client
1050            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1051            .await
1052            .expect("read_at failed");
1053        let mut buf: [u8; 5] = Default::default();
1054        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1055        assert_eq!(&buf, b"hello");
1056        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1057    }
1058
1059    #[fuchsia::test]
1060    async fn test_alignment() {
1061        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1062        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1063        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1064        block_client
1065            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1066            .await
1067            .expect_err("expected failure due to bad alignment");
1068        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1069    }
1070
1071    #[fuchsia::test]
1072    async fn test_parallel_io() {
1073        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1074        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1075        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1076        let mut reads = Vec::new();
1077        for _ in 0..1024 {
1078            reads.push(
1079                block_client
1080                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1081                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1082            );
1083        }
1084        futures::future::join_all(reads).await;
1085        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1086    }
1087
1088    #[fuchsia::test]
1089    async fn test_closed_device() {
1090        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1091        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1092        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1093        let mut reads = Vec::new();
1094        for _ in 0..1024 {
1095            reads.push(
1096                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1097            );
1098        }
1099        assert!(block_client.is_connected());
1100        let _ = futures::join!(futures::future::join_all(reads), async {
1101            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1102        });
1103        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1104        while block_client
1105            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1106            .await
1107            .is_ok()
1108        {}
1109
1110        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1111        // get false-positives from is_connected.
1112        while block_client.is_connected() {
1113            // Sleep for a bit to minimise lock contention.
1114            fasync::Timer::new(fasync::MonotonicInstant::after(
1115                zx::MonotonicDuration::from_millis(500),
1116            ))
1117            .await;
1118        }
1119
1120        // But once is_connected goes negative, it should stay negative.
1121        assert_eq!(block_client.is_connected(), false);
1122        let _ = block_client.detach_vmo(vmo_id).await;
1123    }
1124
1125    #[fuchsia::test]
1126    async fn test_cancelled_reads() {
1127        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1128        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1129        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1130        {
1131            let mut reads = FuturesUnordered::new();
1132            for _ in 0..1024 {
1133                reads.push(
1134                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1135                );
1136            }
1137            // Read the first 500 results and then dump the rest.
1138            for _ in 0..500 {
1139                reads.next().await;
1140            }
1141        }
1142        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1143    }
1144
1145    #[fuchsia::test]
1146    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1147        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1148        let block_client_ref = &block_client;
1149        let test_one = |offset, len, fill| async move {
1150            let buf = vec![fill; len];
1151            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1152            // Read back an extra block either side.
1153            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1154            block_client_ref
1155                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1156                .await
1157                .expect("read_at failed");
1158            assert_eq!(
1159                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1160                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1161            );
1162            assert_eq!(
1163                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1164                &buf[..]
1165            );
1166            assert_eq!(
1167                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1168                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1169            );
1170        };
1171        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1172        join!(
1173            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1174            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1175        );
1176    }
1177
1178    // Implements dummy server which can be used by test cases to verify whether
1179    // channel messages and fifo operations are being received - by using set_channel_handler or
1180    // set_fifo_hander respectively
1181    struct FakeBlockServer<'a> {
1182        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1183        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1184        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1185    }
1186
1187    impl<'a> FakeBlockServer<'a> {
1188        // Creates a new FakeBlockServer given a channel to listen on.
1189        //
1190        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1191        // handles requests received from channel or the fifo respectfully.
1192        //
1193        // 'channel_handler' receives a message before it is handled by the default implementation
1194        // and can return 'true' to indicate all processing is done and no further processing of
1195        // that message is required
1196        //
1197        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1198        // FakeBlockServer will send over the fifo.
1199        fn new(
1200            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1201            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1202            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1203        ) -> FakeBlockServer<'a> {
1204            FakeBlockServer {
1205                server_channel: Some(server_channel),
1206                channel_handler: Box::new(channel_handler),
1207                fifo_handler: Box::new(fifo_handler),
1208            }
1209        }
1210
1211        // Runs the server
1212        async fn run(&mut self) {
1213            let server = self.server_channel.take().unwrap();
1214
1215            // Set up a mock server.
1216            let (server_fifo, client_fifo) =
1217                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1218                    .expect("Fifo::create failed");
1219            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1220
1221            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1222            let fifo_future = Abortable::new(
1223                async {
1224                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1225                    let (mut reader, mut writer) = fifo.async_io();
1226                    let mut request = BlockFifoRequest::default();
1227                    loop {
1228                        match reader.read_entries(&mut request).await {
1229                            Ok(1) => {}
1230                            Err(zx::Status::PEER_CLOSED) => break,
1231                            Err(e) => panic!("read_entry failed {:?}", e),
1232                            _ => unreachable!(),
1233                        };
1234
1235                        let response = self.fifo_handler.as_ref()(request);
1236                        writer
1237                            .write_entries(std::slice::from_ref(&response))
1238                            .await
1239                            .expect("write_entries failed");
1240                    }
1241                },
1242                fifo_future_abort_registration,
1243            );
1244
1245            let channel_future = async {
1246                server
1247                    .into_stream()
1248                    .for_each_concurrent(None, |request| async {
1249                        let request = request.expect("unexpected fidl error");
1250
1251                        match request {
1252                            block::BlockRequest::GetInfo { responder } => {
1253                                responder
1254                                    .send(Ok(&block::BlockInfo {
1255                                        block_count: 1024,
1256                                        block_size: 512,
1257                                        max_transfer_size: 1024 * 1024,
1258                                        flags: block::Flag::empty(),
1259                                    }))
1260                                    .expect("send failed");
1261                            }
1262                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1263                                let stream = session.into_stream();
1264                                stream
1265                                    .for_each(|request| async {
1266                                        let request = request.expect("unexpected fidl error");
1267                                        // Give a chance for the test to register and potentially
1268                                        // handle the event
1269                                        if self.channel_handler.as_ref()(&request) {
1270                                            return;
1271                                        }
1272                                        match request {
1273                                            block::SessionRequest::GetFifo { responder } => {
1274                                                match maybe_server_fifo.lock().take() {
1275                                                    Some(fifo) => {
1276                                                        responder.send(Ok(fifo.downcast()))
1277                                                    }
1278                                                    None => responder.send(Err(
1279                                                        zx::Status::NO_RESOURCES.into_raw(),
1280                                                    )),
1281                                                }
1282                                                .expect("send failed")
1283                                            }
1284                                            block::SessionRequest::AttachVmo {
1285                                                vmo: _,
1286                                                responder,
1287                                            } => responder
1288                                                .send(Ok(&block::VmoId { id: 1 }))
1289                                                .expect("send failed"),
1290                                            block::SessionRequest::Close { responder } => {
1291                                                fifo_future_abort.abort();
1292                                                responder.send(Ok(())).expect("send failed")
1293                                            }
1294                                        }
1295                                    })
1296                                    .await
1297                            }
1298                            _ => panic!("Unexpected message"),
1299                        }
1300                    })
1301                    .await;
1302            };
1303
1304            let _result = join!(fifo_future, channel_future);
1305            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1306        }
1307    }
1308
1309    #[fuchsia::test]
1310    async fn test_block_close_is_called() {
1311        let close_called = fuchsia_sync::Mutex::new(false);
1312        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1313
1314        std::thread::spawn(move || {
1315            let _block_client =
1316                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1317            // The drop here should cause Close to be sent.
1318        });
1319
1320        let channel_handler = |request: &block::SessionRequest| -> bool {
1321            if let block::SessionRequest::Close { .. } = request {
1322                *close_called.lock() = true;
1323            }
1324            false
1325        };
1326        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1327
1328        // After the server has finished running, we can check to see that close was called.
1329        assert!(*close_called.lock());
1330    }
1331
1332    #[fuchsia::test]
1333    async fn test_block_flush_is_called() {
1334        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1335
1336        struct Interface {
1337            flush_called: Arc<AtomicBool>,
1338        }
1339        impl block_server::async_interface::Interface for Interface {
1340            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1341                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1342                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1343                    max_transfer_blocks: None,
1344                    block_range: Some(0..1000),
1345                    type_guid: [0; 16],
1346                    instance_guid: [0; 16],
1347                    name: "foo".to_string(),
1348                    flags: 0,
1349                })))
1350            }
1351
1352            async fn read(
1353                &self,
1354                _device_block_offset: u64,
1355                _block_count: u32,
1356                _vmo: &Arc<zx::Vmo>,
1357                _vmo_offset: u64,
1358                _opts: ReadOptions,
1359                _trace_flow_id: Option<NonZero<u64>>,
1360            ) -> Result<(), zx::Status> {
1361                unreachable!();
1362            }
1363
1364            fn barrier(&self) -> Result<(), zx::Status> {
1365                unreachable!()
1366            }
1367
1368            async fn write(
1369                &self,
1370                _device_block_offset: u64,
1371                _block_count: u32,
1372                _vmo: &Arc<zx::Vmo>,
1373                _vmo_offset: u64,
1374                _opts: WriteOptions,
1375                _trace_flow_id: Option<NonZero<u64>>,
1376            ) -> Result<(), zx::Status> {
1377                unreachable!();
1378            }
1379
1380            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1381                self.flush_called.store(true, Ordering::Relaxed);
1382                Ok(())
1383            }
1384
1385            async fn trim(
1386                &self,
1387                _device_block_offset: u64,
1388                _block_count: u32,
1389                _trace_flow_id: Option<NonZero<u64>>,
1390            ) -> Result<(), zx::Status> {
1391                unreachable!();
1392            }
1393        }
1394
1395        let flush_called = Arc::new(AtomicBool::new(false));
1396
1397        futures::join!(
1398            async {
1399                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1400
1401                block_client.flush().await.expect("flush failed");
1402            },
1403            async {
1404                let block_server = BlockServer::new(
1405                    512,
1406                    Arc::new(Interface { flush_called: flush_called.clone() }),
1407                );
1408                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1409            }
1410        );
1411
1412        assert!(flush_called.load(Ordering::Relaxed));
1413    }
1414
1415    #[fuchsia::test]
1416    async fn test_trace_flow_ids_set() {
1417        let (proxy, server) = fidl::endpoints::create_proxy();
1418
1419        futures::join!(
1420            async {
1421                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1422                block_client.flush().await.expect("flush failed");
1423            },
1424            async {
1425                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1426                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1427                    if request.trace_flow_id > 0 {
1428                        *flow_id.lock() = Some(request.trace_flow_id);
1429                    }
1430                    BlockFifoResponse {
1431                        status: zx::Status::OK.into_raw(),
1432                        reqid: request.reqid,
1433                        ..Default::default()
1434                    }
1435                };
1436                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1437                // After the server has finished running, verify the trace flow ID was set to some value.
1438                assert!(flow_id.lock().is_some());
1439            }
1440        );
1441    }
1442}