block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::{BlockProxy, MAX_TRANSFER_UNBOUNDED};
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_sync::Mutex;
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::mem::MaybeUninit;
18use std::num::NonZero;
19use std::ops::{DerefMut, Range};
20use std::pin::Pin;
21use std::sync::atomic::{AtomicU16, Ordering};
22use std::sync::Arc;
23use std::task::{Context, Poll, Waker};
24use zx::sys::zx_handle_t;
25use zx::{self as zx, HandleBased as _};
26use {
27    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
28    fuchsia_async as fasync, storage_trace as trace,
29};
30
31pub use cache::Cache;
32
33pub use block::Flag as BlockFlags;
34
35pub use block_protocol::*;
36
37pub mod cache;
38
39const TEMP_VMO_SIZE: usize = 65536;
40
41/// If a trace flow ID isn't specified for requests, one will be generated.
42pub const NO_TRACE_ID: u64 = 0;
43
44pub use block_driver::{BlockIoFlag, BlockOpcode};
45
46fn fidl_to_status(error: fidl::Error) -> zx::Status {
47    match error {
48        fidl::Error::ClientChannelClosed { status, .. } => status,
49        _ => zx::Status::INTERNAL,
50    }
51}
52
53fn opcode_str(opcode: u8) -> &'static str {
54    match BlockOpcode::from_primitive(opcode) {
55        Some(BlockOpcode::Read) => "read",
56        Some(BlockOpcode::Write) => "write",
57        Some(BlockOpcode::Flush) => "flush",
58        Some(BlockOpcode::Trim) => "trim",
59        Some(BlockOpcode::CloseVmo) => "close_vmo",
60        None => "unknown",
61    }
62}
63
64// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
65// reused within this process).
66fn generate_trace_flow_id(request_id: u32) -> u64 {
67    lazy_static! {
68        static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
69    };
70    *SELF_HANDLE as u64 + (request_id as u64) << 32
71}
72
73pub enum BufferSlice<'a> {
74    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
75    Memory(&'a [u8]),
76}
77
78impl<'a> BufferSlice<'a> {
79    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
80        BufferSlice::VmoId { vmo_id, offset, length }
81    }
82}
83
84impl<'a> From<&'a [u8]> for BufferSlice<'a> {
85    fn from(buf: &'a [u8]) -> Self {
86        BufferSlice::Memory(buf)
87    }
88}
89
90pub enum MutableBufferSlice<'a> {
91    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
92    Memory(&'a mut [u8]),
93}
94
95impl<'a> MutableBufferSlice<'a> {
96    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
97        MutableBufferSlice::VmoId { vmo_id, offset, length }
98    }
99}
100
101impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
102    fn from(buf: &'a mut [u8]) -> Self {
103        MutableBufferSlice::Memory(buf)
104    }
105}
106
107#[derive(Default)]
108struct RequestState {
109    result: Option<zx::Status>,
110    waker: Option<Waker>,
111}
112
113#[derive(Default)]
114struct FifoState {
115    // The fifo.
116    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
117
118    // The next request ID to be used.
119    next_request_id: u32,
120
121    // A queue of messages to be sent on the fifo.
122    queue: std::collections::VecDeque<BlockFifoRequest>,
123
124    // Map from request ID to RequestState.
125    map: HashMap<u32, RequestState>,
126
127    // The waker for the FifoPoller.
128    poller_waker: Option<Waker>,
129}
130
131impl FifoState {
132    fn terminate(&mut self) {
133        self.fifo.take();
134        for (_, request_state) in self.map.iter_mut() {
135            request_state.result.get_or_insert(zx::Status::CANCELED);
136            if let Some(waker) = request_state.waker.take() {
137                waker.wake();
138            }
139        }
140        if let Some(waker) = self.poller_waker.take() {
141            waker.wake();
142        }
143    }
144
145    // Returns true if polling should be terminated.
146    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
147        let fifo = if let Some(fifo) = self.fifo.as_ref() {
148            fifo
149        } else {
150            return true;
151        };
152
153        loop {
154            let slice = self.queue.as_slices().0;
155            if slice.is_empty() {
156                return false;
157            }
158            match fifo.try_write(context, slice) {
159                Poll::Ready(Ok(sent)) => {
160                    self.queue.drain(0..sent);
161                }
162                Poll::Ready(Err(_)) => {
163                    self.terminate();
164                    return true;
165                }
166                Poll::Pending => {
167                    return false;
168                }
169            }
170        }
171    }
172}
173
174type FifoStateRef = Arc<Mutex<FifoState>>;
175
176// A future used for fifo responses.
177struct ResponseFuture {
178    request_id: u32,
179    fifo_state: FifoStateRef,
180}
181
182impl ResponseFuture {
183    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
184        ResponseFuture { request_id, fifo_state }
185    }
186}
187
188impl Future for ResponseFuture {
189    type Output = Result<(), zx::Status>;
190
191    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
192        let mut state = self.fifo_state.lock();
193        let request_state = state.map.get_mut(&self.request_id).unwrap();
194        if let Some(result) = request_state.result {
195            Poll::Ready(result.into())
196        } else {
197            request_state.waker.replace(context.waker().clone());
198            Poll::Pending
199        }
200    }
201}
202
203impl Drop for ResponseFuture {
204    fn drop(&mut self) {
205        self.fifo_state.lock().map.remove(&self.request_id).unwrap();
206    }
207}
208
209/// Wraps a vmo-id. Will panic if you forget to detach.
210#[derive(Debug)]
211#[must_use]
212pub struct VmoId(AtomicU16);
213
214impl VmoId {
215    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
216    pub fn new(id: u16) -> Self {
217        Self(AtomicU16::new(id))
218    }
219
220    /// Invalidates self and returns a new VmoId with the same underlying ID.
221    pub fn take(&self) -> Self {
222        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
223    }
224
225    pub fn is_valid(&self) -> bool {
226        self.id() != block_driver::BLOCK_VMOID_INVALID
227    }
228
229    /// Takes the ID.  The caller assumes responsibility for detaching.
230    #[must_use]
231    pub fn into_id(self) -> u16 {
232        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
233    }
234
235    pub fn id(&self) -> u16 {
236        self.0.load(Ordering::Relaxed)
237    }
238}
239
240impl PartialEq for VmoId {
241    fn eq(&self, other: &Self) -> bool {
242        self.id() == other.id()
243    }
244}
245
246impl Eq for VmoId {}
247
248impl Drop for VmoId {
249    fn drop(&mut self) {
250        assert_eq!(
251            self.0.load(Ordering::Relaxed),
252            block_driver::BLOCK_VMOID_INVALID,
253            "Did you forget to detach?"
254        );
255    }
256}
257
258impl Hash for VmoId {
259    fn hash<H: Hasher>(&self, state: &mut H) {
260        self.id().hash(state);
261    }
262}
263
264/// Represents a client connection to a block device. This is a simplified version of the block.fidl
265/// interface.
266/// Most users will use the RemoteBlockClient instantiation of this trait.
267#[async_trait]
268pub trait BlockClient: Send + Sync {
269    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
270    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
271
272    /// Detaches the given vmo-id from the device.
273    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
274
275    /// Reads from the device at |device_offset| into the given buffer slice.
276    async fn read_at(
277        &self,
278        buffer_slice: MutableBufferSlice<'_>,
279        device_offset: u64,
280    ) -> Result<(), zx::Status> {
281        self.read_at_traced(buffer_slice, device_offset, 0).await
282    }
283
284    async fn read_at_traced(
285        &self,
286        buffer_slice: MutableBufferSlice<'_>,
287        device_offset: u64,
288        trace_flow_id: u64,
289    ) -> Result<(), zx::Status>;
290
291    /// Writes the data in |buffer_slice| to the device.
292    async fn write_at(
293        &self,
294        buffer_slice: BufferSlice<'_>,
295        device_offset: u64,
296    ) -> Result<(), zx::Status> {
297        self.write_at_with_opts_traced(
298            buffer_slice,
299            device_offset,
300            WriteOptions::empty(),
301            NO_TRACE_ID,
302        )
303        .await
304    }
305
306    async fn write_at_with_opts(
307        &self,
308        buffer_slice: BufferSlice<'_>,
309        device_offset: u64,
310        opts: WriteOptions,
311    ) -> Result<(), zx::Status> {
312        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
313    }
314
315    async fn write_at_with_opts_traced(
316        &self,
317        buffer_slice: BufferSlice<'_>,
318        device_offset: u64,
319        opts: WriteOptions,
320        trace_flow_id: u64,
321    ) -> Result<(), zx::Status>;
322
323    /// Trims the given range on the block device.
324    async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
325        self.trim_traced(device_range, NO_TRACE_ID).await
326    }
327
328    async fn trim_traced(
329        &self,
330        device_range: Range<u64>,
331        trace_flow_id: u64,
332    ) -> Result<(), zx::Status>;
333
334    async fn flush(&self) -> Result<(), zx::Status> {
335        self.flush_traced(NO_TRACE_ID).await
336    }
337
338    /// Sends a flush request to the underlying block device.
339    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
340
341    /// Closes the fifo.
342    async fn close(&self) -> Result<(), zx::Status>;
343
344    /// Returns the block size of the device.
345    fn block_size(&self) -> u32;
346
347    /// Returns the size, in blocks, of the device.
348    fn block_count(&self) -> u64;
349
350    /// Returns the maximum number of blocks which can be transferred in a single request.
351    fn max_transfer_blocks(&self) -> Option<NonZero<u32>>;
352
353    /// Returns the block flags reported by the device.
354    fn block_flags(&self) -> BlockFlags;
355
356    /// Returns true if the remote fifo is still connected.
357    fn is_connected(&self) -> bool;
358}
359
360struct Common {
361    block_size: u32,
362    block_count: u64,
363    max_transfer_blocks: Option<NonZero<u32>>,
364    block_flags: BlockFlags,
365    fifo_state: FifoStateRef,
366    temp_vmo: futures::lock::Mutex<zx::Vmo>,
367    temp_vmo_id: VmoId,
368}
369
370impl Common {
371    fn new(
372        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
373        info: &block::BlockInfo,
374        temp_vmo: zx::Vmo,
375        temp_vmo_id: VmoId,
376    ) -> Self {
377        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
378        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
379        Self {
380            block_size: info.block_size,
381            block_count: info.block_count,
382            max_transfer_blocks: if info.max_transfer_size != MAX_TRANSFER_UNBOUNDED {
383                NonZero::new(info.max_transfer_size / info.block_size)
384            } else {
385                None
386            },
387            block_flags: info.flags,
388            fifo_state,
389            temp_vmo: futures::lock::Mutex::new(temp_vmo),
390            temp_vmo_id,
391        }
392    }
393
394    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
395        if bytes % self.block_size as u64 != 0 {
396            Err(zx::Status::INVALID_ARGS)
397        } else {
398            Ok(bytes / self.block_size as u64)
399        }
400    }
401
402    // Sends the request and waits for the response.
403    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
404        async move {
405            let (request_id, trace_flow_id) = {
406                let mut state = self.fifo_state.lock();
407                if state.fifo.is_none() {
408                    // Fifo has been closed.
409                    return Err(zx::Status::CANCELED);
410                }
411                trace::duration!(
412                    c"storage",
413                    c"block_client::send::start",
414                    "op" => opcode_str(request.command.opcode)
415                );
416                let request_id = state.next_request_id;
417                state.next_request_id = state.next_request_id.overflowing_add(1).0;
418                assert!(
419                    state.map.insert(request_id, RequestState::default()).is_none(),
420                    "request id in use!"
421                );
422                request.reqid = request_id;
423                if request.trace_flow_id == NO_TRACE_ID {
424                    request.trace_flow_id = generate_trace_flow_id(request_id);
425                }
426                let trace_flow_id = request.trace_flow_id;
427                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
428                state.queue.push_back(request);
429                if let Some(waker) = state.poller_waker.clone() {
430                    state.poll_send_requests(&mut Context::from_waker(&waker));
431                }
432                (request_id, trace_flow_id)
433            };
434            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
435            trace::duration!(c"storage", c"block_client::send::end");
436            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
437            Ok(())
438        }
439        .await
440    }
441
442    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
443        self.send(BlockFifoRequest {
444            command: BlockFifoCommand {
445                opcode: BlockOpcode::CloseVmo.into_primitive(),
446                flags: 0,
447                ..Default::default()
448            },
449            vmoid: vmo_id.into_id(),
450            ..Default::default()
451        })
452        .await
453    }
454
455    async fn read_at(
456        &self,
457        buffer_slice: MutableBufferSlice<'_>,
458        device_offset: u64,
459        trace_flow_id: u64,
460    ) -> Result<(), zx::Status> {
461        match buffer_slice {
462            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
463                self.send(BlockFifoRequest {
464                    command: BlockFifoCommand {
465                        opcode: BlockOpcode::Read.into_primitive(),
466                        flags: 0,
467                        ..Default::default()
468                    },
469                    vmoid: vmo_id.id(),
470                    length: self
471                        .to_blocks(length)?
472                        .try_into()
473                        .map_err(|_| zx::Status::INVALID_ARGS)?,
474                    vmo_offset: self.to_blocks(offset)?,
475                    dev_offset: self.to_blocks(device_offset)?,
476                    trace_flow_id,
477                    ..Default::default()
478                })
479                .await?
480            }
481            MutableBufferSlice::Memory(mut slice) => {
482                let temp_vmo = self.temp_vmo.lock().await;
483                let mut device_block = self.to_blocks(device_offset)?;
484                loop {
485                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
486                    let block_count = self.to_blocks(to_do as u64)? as u32;
487                    self.send(BlockFifoRequest {
488                        command: BlockFifoCommand {
489                            opcode: BlockOpcode::Read.into_primitive(),
490                            flags: 0,
491                            ..Default::default()
492                        },
493                        vmoid: self.temp_vmo_id.id(),
494                        length: block_count,
495                        vmo_offset: 0,
496                        dev_offset: device_block,
497                        trace_flow_id,
498                        ..Default::default()
499                    })
500                    .await?;
501                    temp_vmo.read(&mut slice[..to_do], 0)?;
502                    if to_do == slice.len() {
503                        break;
504                    }
505                    device_block += block_count as u64;
506                    slice = &mut slice[to_do..];
507                }
508            }
509        }
510        Ok(())
511    }
512
513    async fn write_at(
514        &self,
515        buffer_slice: BufferSlice<'_>,
516        device_offset: u64,
517        opts: WriteOptions,
518        trace_flow_id: u64,
519    ) -> Result<(), zx::Status> {
520        let flags = if opts.contains(WriteOptions::FORCE_ACCESS) {
521            BlockIoFlag::FORCE_ACCESS.bits()
522        } else {
523            0
524        };
525        match buffer_slice {
526            BufferSlice::VmoId { vmo_id, offset, length } => {
527                self.send(BlockFifoRequest {
528                    command: BlockFifoCommand {
529                        opcode: BlockOpcode::Write.into_primitive(),
530                        flags,
531                        ..Default::default()
532                    },
533                    vmoid: vmo_id.id(),
534                    length: self
535                        .to_blocks(length)?
536                        .try_into()
537                        .map_err(|_| zx::Status::INVALID_ARGS)?,
538                    vmo_offset: self.to_blocks(offset)?,
539                    dev_offset: self.to_blocks(device_offset)?,
540                    trace_flow_id,
541                    ..Default::default()
542                })
543                .await?;
544            }
545            BufferSlice::Memory(mut slice) => {
546                let temp_vmo = self.temp_vmo.lock().await;
547                let mut device_block = self.to_blocks(device_offset)?;
548                loop {
549                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
550                    let block_count = self.to_blocks(to_do as u64)? as u32;
551                    temp_vmo.write(&slice[..to_do], 0)?;
552                    self.send(BlockFifoRequest {
553                        command: BlockFifoCommand {
554                            opcode: BlockOpcode::Write.into_primitive(),
555                            flags,
556                            ..Default::default()
557                        },
558                        vmoid: self.temp_vmo_id.id(),
559                        length: block_count,
560                        vmo_offset: 0,
561                        dev_offset: device_block,
562                        trace_flow_id,
563                        ..Default::default()
564                    })
565                    .await?;
566                    if to_do == slice.len() {
567                        break;
568                    }
569                    device_block += block_count as u64;
570                    slice = &slice[to_do..];
571                }
572            }
573        }
574        Ok(())
575    }
576
577    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
578        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
579        let dev_offset = self.to_blocks(device_range.start)?;
580        self.send(BlockFifoRequest {
581            command: BlockFifoCommand {
582                opcode: BlockOpcode::Trim.into_primitive(),
583                flags: 0,
584                ..Default::default()
585            },
586            vmoid: block_driver::BLOCK_VMOID_INVALID,
587            length,
588            dev_offset,
589            trace_flow_id,
590            ..Default::default()
591        })
592        .await
593    }
594
595    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
596        self.send(BlockFifoRequest {
597            command: BlockFifoCommand {
598                opcode: BlockOpcode::Flush.into_primitive(),
599                flags: 0,
600                ..Default::default()
601            },
602            vmoid: block_driver::BLOCK_VMOID_INVALID,
603            trace_flow_id,
604            ..Default::default()
605        })
606        .await
607    }
608
609    fn block_size(&self) -> u32 {
610        self.block_size
611    }
612
613    fn block_count(&self) -> u64 {
614        self.block_count
615    }
616
617    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
618        self.max_transfer_blocks.clone()
619    }
620
621    fn block_flags(&self) -> BlockFlags {
622        self.block_flags
623    }
624
625    fn is_connected(&self) -> bool {
626        self.fifo_state.lock().fifo.is_some()
627    }
628}
629
630impl Drop for Common {
631    fn drop(&mut self) {
632        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
633        // down.
634        let _ = self.temp_vmo_id.take().into_id();
635        self.fifo_state.lock().terminate();
636    }
637}
638
639/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
640pub struct RemoteBlockClient {
641    session: block::SessionProxy,
642    common: Common,
643}
644
645pub trait AsBlockProxy {
646    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
647
648    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
649}
650
651impl<T: AsBlockProxy> AsBlockProxy for &T {
652    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
653        AsBlockProxy::get_info(*self)
654    }
655    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
656        AsBlockProxy::open_session(*self, session)
657    }
658}
659
660macro_rules! impl_as_block_proxy {
661    ($name:ident) => {
662        impl AsBlockProxy for $name {
663            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
664                $name::get_info(self).await
665            }
666
667            fn open_session(
668                &self,
669                session: ServerEnd<block::SessionMarker>,
670            ) -> Result<(), fidl::Error> {
671                $name::open_session(self, session)
672            }
673        }
674    };
675}
676
677impl_as_block_proxy!(BlockProxy);
678impl_as_block_proxy!(PartitionProxy);
679impl_as_block_proxy!(VolumeProxy);
680
681impl RemoteBlockClient {
682    /// Returns a connection to a remote block device via the given channel.
683    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
684        let info =
685            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
686        let (session, server) = fidl::endpoints::create_proxy();
687        let () = remote.open_session(server).map_err(fidl_to_status)?;
688        Self::from_session(info, session).await
689    }
690
691    pub async fn from_session(
692        info: block::BlockInfo,
693        session: block::SessionProxy,
694    ) -> Result<Self, zx::Status> {
695        let fifo =
696            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
697        let fifo = fasync::Fifo::from_fifo(fifo);
698        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
699        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
700        let vmo_id =
701            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
702        let vmo_id = VmoId::new(vmo_id.id);
703        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
704    }
705}
706
707#[async_trait]
708impl BlockClient for RemoteBlockClient {
709    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
710        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
711        let vmo_id = self
712            .session
713            .attach_vmo(dup)
714            .await
715            .map_err(fidl_to_status)?
716            .map_err(zx::Status::from_raw)?;
717        Ok(VmoId::new(vmo_id.id))
718    }
719
720    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
721        self.common.detach_vmo(vmo_id).await
722    }
723
724    async fn read_at_traced(
725        &self,
726        buffer_slice: MutableBufferSlice<'_>,
727        device_offset: u64,
728        trace_flow_id: u64,
729    ) -> Result<(), zx::Status> {
730        self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
731    }
732
733    async fn write_at_with_opts_traced(
734        &self,
735        buffer_slice: BufferSlice<'_>,
736        device_offset: u64,
737        opts: WriteOptions,
738        trace_flow_id: u64,
739    ) -> Result<(), zx::Status> {
740        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
741    }
742
743    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
744        self.common.trim(range, trace_flow_id).await
745    }
746
747    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
748        self.common.flush(trace_flow_id).await
749    }
750
751    async fn close(&self) -> Result<(), zx::Status> {
752        let () =
753            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
754        Ok(())
755    }
756
757    fn block_size(&self) -> u32 {
758        self.common.block_size()
759    }
760
761    fn block_count(&self) -> u64 {
762        self.common.block_count()
763    }
764
765    fn max_transfer_blocks(&self) -> Option<NonZero<u32>> {
766        self.common.max_transfer_blocks()
767    }
768
769    fn block_flags(&self) -> BlockFlags {
770        self.common.block_flags()
771    }
772
773    fn is_connected(&self) -> bool {
774        self.common.is_connected()
775    }
776}
777
778pub struct RemoteBlockClientSync {
779    session: block::SessionSynchronousProxy,
780    common: Common,
781}
782
783impl RemoteBlockClientSync {
784    /// Returns a connection to a remote block device via the given channel, but spawns a separate
785    /// thread for polling the fifo which makes it work in cases where no executor is configured for
786    /// the calling thread.
787    pub fn new(
788        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
789    ) -> Result<Self, zx::Status> {
790        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
791        let info = remote
792            .get_info(zx::MonotonicInstant::INFINITE)
793            .map_err(fidl_to_status)?
794            .map_err(zx::Status::from_raw)?;
795        let (client, server) = fidl::endpoints::create_endpoints();
796        let () = remote.open_session(server).map_err(fidl_to_status)?;
797        let session = block::SessionSynchronousProxy::new(client.into_channel());
798        let fifo = session
799            .get_fifo(zx::MonotonicInstant::INFINITE)
800            .map_err(fidl_to_status)?
801            .map_err(zx::Status::from_raw)?;
802        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
803        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
804        let vmo_id = session
805            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
806            .map_err(fidl_to_status)?
807            .map_err(zx::Status::from_raw)?;
808        let vmo_id = VmoId::new(vmo_id.id);
809
810        // The fifo needs to be instantiated from the thread that has the executor as that's where
811        // the fifo registers for notifications to be delivered.
812        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
813        std::thread::spawn(move || {
814            let mut executor = fasync::LocalExecutor::new();
815            let fifo = fasync::Fifo::from_fifo(fifo);
816            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
817            let fifo_state = common.fifo_state.clone();
818            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
819            executor.run_singlethreaded(FifoPoller { fifo_state });
820        });
821        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
822    }
823
824    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
825        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
826        let vmo_id = self
827            .session
828            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
829            .map_err(fidl_to_status)?
830            .map_err(zx::Status::from_raw)?;
831        Ok(VmoId::new(vmo_id.id))
832    }
833
834    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
835        block_on(self.common.detach_vmo(vmo_id))
836    }
837
838    pub fn read_at(
839        &self,
840        buffer_slice: MutableBufferSlice<'_>,
841        device_offset: u64,
842    ) -> Result<(), zx::Status> {
843        block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
844    }
845
846    pub fn write_at(
847        &self,
848        buffer_slice: BufferSlice<'_>,
849        device_offset: u64,
850    ) -> Result<(), zx::Status> {
851        block_on(self.common.write_at(
852            buffer_slice,
853            device_offset,
854            WriteOptions::empty(),
855            NO_TRACE_ID,
856        ))
857    }
858
859    pub fn flush(&self) -> Result<(), zx::Status> {
860        block_on(self.common.flush(NO_TRACE_ID))
861    }
862
863    pub fn close(&self) -> Result<(), zx::Status> {
864        let () = self
865            .session
866            .close(zx::MonotonicInstant::INFINITE)
867            .map_err(fidl_to_status)?
868            .map_err(zx::Status::from_raw)?;
869        Ok(())
870    }
871
872    pub fn block_size(&self) -> u32 {
873        self.common.block_size()
874    }
875
876    pub fn block_count(&self) -> u64 {
877        self.common.block_count()
878    }
879
880    pub fn is_connected(&self) -> bool {
881        self.common.is_connected()
882    }
883}
884
885impl Drop for RemoteBlockClientSync {
886    fn drop(&mut self) {
887        // Ignore errors here as there is not much we can do about it.
888        let _ = self.close();
889    }
890}
891
892// FifoPoller is a future responsible for sending and receiving from the fifo.
893struct FifoPoller {
894    fifo_state: FifoStateRef,
895}
896
897impl Future for FifoPoller {
898    type Output = ();
899
900    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
901        let mut state_lock = self.fifo_state.lock();
902        let state = state_lock.deref_mut(); // So that we can split the borrow.
903
904        // Send requests.
905        if state.poll_send_requests(context) {
906            return Poll::Ready(());
907        }
908
909        // Receive responses.
910        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
911        loop {
912            let mut response = MaybeUninit::uninit();
913            match fifo.try_read(context, &mut response) {
914                Poll::Pending => {
915                    state.poller_waker = Some(context.waker().clone());
916                    return Poll::Pending;
917                }
918                Poll::Ready(Ok(_)) => {
919                    let response = unsafe { response.assume_init() };
920                    let request_id = response.reqid;
921                    // If the request isn't in the map, assume that it's a cancelled read.
922                    if let Some(request_state) = state.map.get_mut(&request_id) {
923                        request_state.result.replace(zx::Status::from_raw(response.status));
924                        if let Some(waker) = request_state.waker.take() {
925                            waker.wake();
926                        }
927                    }
928                }
929                Poll::Ready(Err(_)) => {
930                    state.terminate();
931                    return Poll::Ready(());
932                }
933            }
934        }
935    }
936}
937
938#[cfg(test)]
939mod tests {
940    use super::{
941        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
942        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
943    };
944    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
945    use fidl::endpoints::RequestStream as _;
946    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
947    use futures::join;
948    use futures::stream::futures_unordered::FuturesUnordered;
949    use futures::stream::StreamExt as _;
950    use ramdevice_client::RamdiskClient;
951    use std::borrow::Cow;
952    use std::num::NonZero;
953    use std::sync::atomic::{AtomicBool, Ordering};
954    use std::sync::Arc;
955    use {fidl_fuchsia_hardware_block as block, fuchsia_async as fasync};
956
957    const RAMDISK_BLOCK_SIZE: u64 = 1024;
958    const RAMDISK_BLOCK_COUNT: u64 = 1024;
959
960    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
961        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
962            .await
963            .expect("RamdiskClient::create failed");
964        let client_end = ramdisk.open().expect("ramdisk.open failed");
965        let proxy = client_end.into_proxy();
966        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
967        assert_eq!(block_client.block_size(), 1024);
968        let client_end = ramdisk.open().expect("ramdisk.open failed");
969        let proxy = client_end.into_proxy();
970        (ramdisk, proxy, block_client)
971    }
972
973    #[fuchsia::test]
974    async fn test_against_ram_disk() {
975        let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
976
977        let stats_before = block_proxy
978            .get_stats(false)
979            .await
980            .expect("get_stats failed")
981            .map_err(zx::Status::from_raw)
982            .expect("get_stats error");
983
984        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
985        vmo.write(b"hello", 5).expect("vmo.write failed");
986        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
987        block_client
988            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
989            .await
990            .expect("write_at failed");
991        block_client
992            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
993            .await
994            .expect("read_at failed");
995        let mut buf: [u8; 5] = Default::default();
996        vmo.read(&mut buf, 1029).expect("vmo.read failed");
997        assert_eq!(&buf, b"hello");
998        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
999
1000        // check that the stats are what we expect them to be
1001        let stats_after = block_proxy
1002            .get_stats(false)
1003            .await
1004            .expect("get_stats failed")
1005            .map_err(zx::Status::from_raw)
1006            .expect("get_stats error");
1007        // write stats
1008        assert_eq!(
1009            stats_before.write.success.total_calls + 1,
1010            stats_after.write.success.total_calls
1011        );
1012        assert_eq!(
1013            stats_before.write.success.bytes_transferred + 1024,
1014            stats_after.write.success.bytes_transferred
1015        );
1016        assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1017        // read stats
1018        assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1019        assert_eq!(
1020            stats_before.read.success.bytes_transferred + 2048,
1021            stats_after.read.success.bytes_transferred
1022        );
1023        assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1024    }
1025
1026    #[fuchsia::test]
1027    async fn test_against_ram_disk_with_flush() {
1028        let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
1029
1030        let stats_before = block_proxy
1031            .get_stats(false)
1032            .await
1033            .expect("get_stats failed")
1034            .map_err(zx::Status::from_raw)
1035            .expect("get_stats error");
1036
1037        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1038        vmo.write(b"hello", 5).expect("vmo.write failed");
1039        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1040        block_client
1041            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1042            .await
1043            .expect("write_at failed");
1044        block_client.flush().await.expect("flush failed");
1045        block_client
1046            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1047            .await
1048            .expect("read_at failed");
1049        let mut buf: [u8; 5] = Default::default();
1050        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1051        assert_eq!(&buf, b"hello");
1052        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1053
1054        // check that the stats are what we expect them to be
1055        let stats_after = block_proxy
1056            .get_stats(false)
1057            .await
1058            .expect("get_stats failed")
1059            .map_err(zx::Status::from_raw)
1060            .expect("get_stats error");
1061        // write stats
1062        assert_eq!(
1063            stats_before.write.success.total_calls + 1,
1064            stats_after.write.success.total_calls
1065        );
1066        assert_eq!(
1067            stats_before.write.success.bytes_transferred + 1024,
1068            stats_after.write.success.bytes_transferred
1069        );
1070        assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1071        // flush stats
1072        assert_eq!(
1073            stats_before.flush.success.total_calls + 1,
1074            stats_after.flush.success.total_calls
1075        );
1076        assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
1077        // read stats
1078        assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1079        assert_eq!(
1080            stats_before.read.success.bytes_transferred + 2048,
1081            stats_after.read.success.bytes_transferred
1082        );
1083        assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1084    }
1085
1086    #[fuchsia::test]
1087    async fn test_alignment() {
1088        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1089        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1090        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1091        block_client
1092            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1093            .await
1094            .expect_err("expected failure due to bad alignment");
1095        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1096    }
1097
1098    #[fuchsia::test]
1099    async fn test_parallel_io() {
1100        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1101        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1102        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1103        let mut reads = Vec::new();
1104        for _ in 0..1024 {
1105            reads.push(
1106                block_client
1107                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1108                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1109            );
1110        }
1111        futures::future::join_all(reads).await;
1112        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1113    }
1114
1115    #[fuchsia::test]
1116    async fn test_closed_device() {
1117        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1118        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1119        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1120        let mut reads = Vec::new();
1121        for _ in 0..1024 {
1122            reads.push(
1123                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1124            );
1125        }
1126        assert!(block_client.is_connected());
1127        let _ = futures::join!(futures::future::join_all(reads), async {
1128            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1129        });
1130        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1131        while block_client
1132            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1133            .await
1134            .is_ok()
1135        {}
1136
1137        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1138        // get false-positives from is_connected.
1139        while block_client.is_connected() {
1140            // Sleep for a bit to minimise lock contention.
1141            fasync::Timer::new(fasync::MonotonicInstant::after(
1142                zx::MonotonicDuration::from_millis(500),
1143            ))
1144            .await;
1145        }
1146
1147        // But once is_connected goes negative, it should stay negative.
1148        assert_eq!(block_client.is_connected(), false);
1149        let _ = block_client.detach_vmo(vmo_id).await;
1150    }
1151
1152    #[fuchsia::test]
1153    async fn test_cancelled_reads() {
1154        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1155        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1156        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1157        {
1158            let mut reads = FuturesUnordered::new();
1159            for _ in 0..1024 {
1160                reads.push(
1161                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1162                );
1163            }
1164            // Read the first 500 results and then dump the rest.
1165            for _ in 0..500 {
1166                reads.next().await;
1167            }
1168        }
1169        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1170    }
1171
1172    #[fuchsia::test]
1173    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1174        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1175        let block_client_ref = &block_client;
1176        let test_one = |offset, len, fill| async move {
1177            let buf = vec![fill; len];
1178            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1179            // Read back an extra block either side.
1180            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1181            block_client_ref
1182                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1183                .await
1184                .expect("read_at failed");
1185            assert_eq!(
1186                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1187                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1188            );
1189            assert_eq!(
1190                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1191                &buf[..]
1192            );
1193            assert_eq!(
1194                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1195                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1196            );
1197        };
1198        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1199        join!(
1200            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1201            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1202        );
1203    }
1204
1205    // Implements dummy server which can be used by test cases to verify whether
1206    // channel messages and fifo operations are being received - by using set_channel_handler or
1207    // set_fifo_hander respectively
1208    struct FakeBlockServer<'a> {
1209        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1210        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1211        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1212    }
1213
1214    impl<'a> FakeBlockServer<'a> {
1215        // Creates a new FakeBlockServer given a channel to listen on.
1216        //
1217        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1218        // handles requests received from channel or the fifo respectfully.
1219        //
1220        // 'channel_handler' receives a message before it is handled by the default implementation
1221        // and can return 'true' to indicate all processing is done and no further processing of
1222        // that message is required
1223        //
1224        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1225        // FakeBlockServer will send over the fifo.
1226        fn new(
1227            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1228            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1229            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1230        ) -> FakeBlockServer<'a> {
1231            FakeBlockServer {
1232                server_channel: Some(server_channel),
1233                channel_handler: Box::new(channel_handler),
1234                fifo_handler: Box::new(fifo_handler),
1235            }
1236        }
1237
1238        // Runs the server
1239        async fn run(&mut self) {
1240            let server = self.server_channel.take().unwrap();
1241
1242            // Set up a mock server.
1243            let (server_fifo, client_fifo) =
1244                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1245                    .expect("Fifo::create failed");
1246            let maybe_server_fifo = fuchsia_sync::Mutex::new(Some(client_fifo));
1247
1248            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1249            let fifo_future = Abortable::new(
1250                async {
1251                    let mut fifo = fasync::Fifo::from_fifo(server_fifo);
1252                    let (mut reader, mut writer) = fifo.async_io();
1253                    let mut request = BlockFifoRequest::default();
1254                    loop {
1255                        match reader.read_entries(&mut request).await {
1256                            Ok(1) => {}
1257                            Err(zx::Status::PEER_CLOSED) => break,
1258                            Err(e) => panic!("read_entry failed {:?}", e),
1259                            _ => unreachable!(),
1260                        };
1261
1262                        let response = self.fifo_handler.as_ref()(request);
1263                        writer
1264                            .write_entries(std::slice::from_ref(&response))
1265                            .await
1266                            .expect("write_entries failed");
1267                    }
1268                },
1269                fifo_future_abort_registration,
1270            );
1271
1272            let channel_future = async {
1273                server
1274                    .into_stream()
1275                    .for_each_concurrent(None, |request| async {
1276                        let request = request.expect("unexpected fidl error");
1277
1278                        match request {
1279                            block::BlockRequest::GetInfo { responder } => {
1280                                responder
1281                                    .send(Ok(&block::BlockInfo {
1282                                        block_count: 1024,
1283                                        block_size: 512,
1284                                        max_transfer_size: 1024 * 1024,
1285                                        flags: block::Flag::empty(),
1286                                    }))
1287                                    .expect("send failed");
1288                            }
1289                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1290                                let stream = session.into_stream();
1291                                stream
1292                                    .for_each(|request| async {
1293                                        let request = request.expect("unexpected fidl error");
1294                                        // Give a chance for the test to register and potentially
1295                                        // handle the event
1296                                        if self.channel_handler.as_ref()(&request) {
1297                                            return;
1298                                        }
1299                                        match request {
1300                                            block::SessionRequest::GetFifo { responder } => {
1301                                                match maybe_server_fifo.lock().take() {
1302                                                    Some(fifo) => {
1303                                                        responder.send(Ok(fifo.downcast()))
1304                                                    }
1305                                                    None => responder.send(Err(
1306                                                        zx::Status::NO_RESOURCES.into_raw(),
1307                                                    )),
1308                                                }
1309                                                .expect("send failed")
1310                                            }
1311                                            block::SessionRequest::AttachVmo {
1312                                                vmo: _,
1313                                                responder,
1314                                            } => responder
1315                                                .send(Ok(&block::VmoId { id: 1 }))
1316                                                .expect("send failed"),
1317                                            block::SessionRequest::Close { responder } => {
1318                                                fifo_future_abort.abort();
1319                                                responder.send(Ok(())).expect("send failed")
1320                                            }
1321                                        }
1322                                    })
1323                                    .await
1324                            }
1325                            _ => panic!("Unexpected message"),
1326                        }
1327                    })
1328                    .await;
1329            };
1330
1331            let _result = join!(fifo_future, channel_future);
1332            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1333        }
1334    }
1335
1336    #[fuchsia::test]
1337    async fn test_block_close_is_called() {
1338        let close_called = fuchsia_sync::Mutex::new(false);
1339        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1340
1341        std::thread::spawn(move || {
1342            let _block_client =
1343                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1344            // The drop here should cause Close to be sent.
1345        });
1346
1347        let channel_handler = |request: &block::SessionRequest| -> bool {
1348            if let block::SessionRequest::Close { .. } = request {
1349                *close_called.lock() = true;
1350            }
1351            false
1352        };
1353        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1354
1355        // After the server has finished running, we can check to see that close was called.
1356        assert!(*close_called.lock());
1357    }
1358
1359    #[fuchsia::test]
1360    async fn test_block_flush_is_called() {
1361        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1362
1363        struct Interface {
1364            flush_called: Arc<AtomicBool>,
1365        }
1366        impl block_server::async_interface::Interface for Interface {
1367            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1368                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1369                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1370                    max_transfer_blocks: None,
1371                    block_range: Some(0..1000),
1372                    type_guid: [0; 16],
1373                    instance_guid: [0; 16],
1374                    name: "foo".to_string(),
1375                    flags: 0,
1376                })))
1377            }
1378
1379            async fn read(
1380                &self,
1381                _device_block_offset: u64,
1382                _block_count: u32,
1383                _vmo: &Arc<zx::Vmo>,
1384                _vmo_offset: u64,
1385                _trace_flow_id: Option<NonZero<u64>>,
1386            ) -> Result<(), zx::Status> {
1387                unreachable!();
1388            }
1389
1390            async fn write(
1391                &self,
1392                _device_block_offset: u64,
1393                _block_count: u32,
1394                _vmo: &Arc<zx::Vmo>,
1395                _vmo_offset: u64,
1396                _opts: WriteOptions,
1397                _trace_flow_id: Option<NonZero<u64>>,
1398            ) -> Result<(), zx::Status> {
1399                unreachable!();
1400            }
1401
1402            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1403                self.flush_called.store(true, Ordering::Relaxed);
1404                Ok(())
1405            }
1406
1407            async fn trim(
1408                &self,
1409                _device_block_offset: u64,
1410                _block_count: u32,
1411                _trace_flow_id: Option<NonZero<u64>>,
1412            ) -> Result<(), zx::Status> {
1413                unreachable!();
1414            }
1415        }
1416
1417        let flush_called = Arc::new(AtomicBool::new(false));
1418
1419        futures::join!(
1420            async {
1421                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1422
1423                block_client.flush().await.expect("flush failed");
1424            },
1425            async {
1426                let block_server = BlockServer::new(
1427                    512,
1428                    Arc::new(Interface { flush_called: flush_called.clone() }),
1429                );
1430                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1431            }
1432        );
1433
1434        assert!(flush_called.load(Ordering::Relaxed));
1435    }
1436
1437    #[fuchsia::test]
1438    async fn test_trace_flow_ids_set() {
1439        let (proxy, server) = fidl::endpoints::create_proxy();
1440
1441        futures::join!(
1442            async {
1443                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1444                block_client.flush().await.expect("flush failed");
1445            },
1446            async {
1447                let flow_id: fuchsia_sync::Mutex<Option<u64>> = fuchsia_sync::Mutex::new(None);
1448                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1449                    if request.trace_flow_id > 0 {
1450                        *flow_id.lock() = Some(request.trace_flow_id);
1451                    }
1452                    BlockFifoResponse {
1453                        status: zx::Status::OK.into_raw(),
1454                        reqid: request.reqid,
1455                        ..Default::default()
1456                    }
1457                };
1458                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1459                // After the server has finished running, verify the trace flow ID was set to some value.
1460                assert!(flow_id.lock().is_some());
1461            }
1462        );
1463    }
1464}