block_client/
lib.rs

1// Copyright 2020 The Fuchsia Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
5use async_trait::async_trait;
6use fidl::endpoints::ServerEnd;
7use fidl_fuchsia_hardware_block::BlockProxy;
8use fidl_fuchsia_hardware_block_partition::PartitionProxy;
9use fidl_fuchsia_hardware_block_volume::VolumeProxy;
10use fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _};
11use futures::channel::oneshot;
12use futures::executor::block_on;
13use lazy_static::lazy_static;
14use std::collections::HashMap;
15use std::future::Future;
16use std::hash::{Hash, Hasher};
17use std::ops::{DerefMut, Range};
18use std::pin::Pin;
19use std::sync::atomic::{AtomicU16, Ordering};
20use std::sync::{Arc, Mutex};
21use std::task::{Context, Poll, Waker};
22use zx::sys::zx_handle_t;
23use zx::{self as zx, HandleBased as _};
24use {
25    fidl_fuchsia_hardware_block as block, fidl_fuchsia_hardware_block_driver as block_driver,
26    storage_trace as trace,
27};
28
29pub use cache::Cache;
30
31pub use block::Flag as BlockFlags;
32
33pub use block_protocol::*;
34
35pub mod cache;
36
37const TEMP_VMO_SIZE: usize = 65536;
38
39/// If a trace flow ID isn't specified for requests, one will be generated.
40pub const NO_TRACE_ID: u64 = 0;
41
42pub use block_driver::{BlockIoFlag, BlockOpcode};
43
44fn fidl_to_status(error: fidl::Error) -> zx::Status {
45    match error {
46        fidl::Error::ClientChannelClosed { status, .. } => status,
47        _ => zx::Status::INTERNAL,
48    }
49}
50
51fn opcode_str(opcode: u8) -> &'static str {
52    match BlockOpcode::from_primitive(opcode) {
53        Some(BlockOpcode::Read) => "read",
54        Some(BlockOpcode::Write) => "write",
55        Some(BlockOpcode::Flush) => "flush",
56        Some(BlockOpcode::Trim) => "trim",
57        Some(BlockOpcode::CloseVmo) => "close_vmo",
58        None => "unknown",
59    }
60}
61
62// Generates a trace ID that will be unique across the system (as long as |request_id| isn't
63// reused within this process).
64fn generate_trace_flow_id(request_id: u32) -> u64 {
65    lazy_static! {
66        static ref SELF_HANDLE: zx_handle_t = fuchsia_runtime::process_self().raw_handle();
67    };
68    *SELF_HANDLE as u64 + (request_id as u64) << 32
69}
70
71pub enum BufferSlice<'a> {
72    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
73    Memory(&'a [u8]),
74}
75
76impl<'a> BufferSlice<'a> {
77    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
78        BufferSlice::VmoId { vmo_id, offset, length }
79    }
80}
81
82impl<'a> From<&'a [u8]> for BufferSlice<'a> {
83    fn from(buf: &'a [u8]) -> Self {
84        BufferSlice::Memory(buf)
85    }
86}
87
88pub enum MutableBufferSlice<'a> {
89    VmoId { vmo_id: &'a VmoId, offset: u64, length: u64 },
90    Memory(&'a mut [u8]),
91}
92
93impl<'a> MutableBufferSlice<'a> {
94    pub fn new_with_vmo_id(vmo_id: &'a VmoId, offset: u64, length: u64) -> Self {
95        MutableBufferSlice::VmoId { vmo_id, offset, length }
96    }
97}
98
99impl<'a> From<&'a mut [u8]> for MutableBufferSlice<'a> {
100    fn from(buf: &'a mut [u8]) -> Self {
101        MutableBufferSlice::Memory(buf)
102    }
103}
104
105#[derive(Default)]
106struct RequestState {
107    result: Option<zx::Status>,
108    waker: Option<Waker>,
109}
110
111#[derive(Default)]
112struct FifoState {
113    // The fifo.
114    fifo: Option<fasync::Fifo<BlockFifoResponse, BlockFifoRequest>>,
115
116    // The next request ID to be used.
117    next_request_id: u32,
118
119    // A queue of messages to be sent on the fifo.
120    queue: std::collections::VecDeque<BlockFifoRequest>,
121
122    // Map from request ID to RequestState.
123    map: HashMap<u32, RequestState>,
124
125    // The waker for the FifoPoller.
126    poller_waker: Option<Waker>,
127}
128
129impl FifoState {
130    fn terminate(&mut self) {
131        self.fifo.take();
132        for (_, request_state) in self.map.iter_mut() {
133            request_state.result.get_or_insert(zx::Status::CANCELED);
134            if let Some(waker) = request_state.waker.take() {
135                waker.wake();
136            }
137        }
138        if let Some(waker) = self.poller_waker.take() {
139            waker.wake();
140        }
141    }
142
143    // Returns true if polling should be terminated.
144    fn poll_send_requests(&mut self, context: &mut Context<'_>) -> bool {
145        let fifo = if let Some(fifo) = self.fifo.as_ref() {
146            fifo
147        } else {
148            return true;
149        };
150
151        loop {
152            let slice = self.queue.as_slices().0;
153            if slice.is_empty() {
154                return false;
155            }
156            match fifo.write(context, slice) {
157                Poll::Ready(Ok(sent)) => {
158                    self.queue.drain(0..sent);
159                }
160                Poll::Ready(Err(_)) => {
161                    self.terminate();
162                    return true;
163                }
164                Poll::Pending => {
165                    return false;
166                }
167            }
168        }
169    }
170}
171
172type FifoStateRef = Arc<Mutex<FifoState>>;
173
174// A future used for fifo responses.
175struct ResponseFuture {
176    request_id: u32,
177    fifo_state: FifoStateRef,
178}
179
180impl ResponseFuture {
181    fn new(fifo_state: FifoStateRef, request_id: u32) -> Self {
182        ResponseFuture { request_id, fifo_state }
183    }
184}
185
186impl Future for ResponseFuture {
187    type Output = Result<(), zx::Status>;
188
189    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
190        let mut state = self.fifo_state.lock().unwrap();
191        let request_state = state.map.get_mut(&self.request_id).unwrap();
192        if let Some(result) = request_state.result {
193            Poll::Ready(result.into())
194        } else {
195            request_state.waker.replace(context.waker().clone());
196            Poll::Pending
197        }
198    }
199}
200
201impl Drop for ResponseFuture {
202    fn drop(&mut self) {
203        self.fifo_state.lock().unwrap().map.remove(&self.request_id).unwrap();
204    }
205}
206
207/// Wraps a vmo-id. Will panic if you forget to detach.
208#[derive(Debug)]
209#[must_use]
210pub struct VmoId(AtomicU16);
211
212impl VmoId {
213    /// VmoIds will normally be vended by attach_vmo, but this might be used in some tests
214    pub fn new(id: u16) -> Self {
215        Self(AtomicU16::new(id))
216    }
217
218    /// Invalidates self and returns a new VmoId with the same underlying ID.
219    pub fn take(&self) -> Self {
220        Self(AtomicU16::new(self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)))
221    }
222
223    pub fn is_valid(&self) -> bool {
224        self.id() != block_driver::BLOCK_VMOID_INVALID
225    }
226
227    /// Takes the ID.  The caller assumes responsibility for detaching.
228    #[must_use]
229    pub fn into_id(self) -> u16 {
230        self.0.swap(block_driver::BLOCK_VMOID_INVALID, Ordering::Relaxed)
231    }
232
233    pub fn id(&self) -> u16 {
234        self.0.load(Ordering::Relaxed)
235    }
236}
237
238impl PartialEq for VmoId {
239    fn eq(&self, other: &Self) -> bool {
240        self.id() == other.id()
241    }
242}
243
244impl Eq for VmoId {}
245
246impl Drop for VmoId {
247    fn drop(&mut self) {
248        assert_eq!(
249            self.0.load(Ordering::Relaxed),
250            block_driver::BLOCK_VMOID_INVALID,
251            "Did you forget to detach?"
252        );
253    }
254}
255
256impl Hash for VmoId {
257    fn hash<H: Hasher>(&self, state: &mut H) {
258        self.id().hash(state);
259    }
260}
261
262/// Represents a client connection to a block device. This is a simplified version of the block.fidl
263/// interface.
264/// Most users will use the RemoteBlockClient instantiation of this trait.
265#[async_trait]
266pub trait BlockClient: Send + Sync {
267    /// Wraps AttachVmo from fuchsia.hardware.block::Block.
268    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status>;
269
270    /// Detaches the given vmo-id from the device.
271    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status>;
272
273    /// Reads from the device at |device_offset| into the given buffer slice.
274    async fn read_at(
275        &self,
276        buffer_slice: MutableBufferSlice<'_>,
277        device_offset: u64,
278    ) -> Result<(), zx::Status> {
279        self.read_at_traced(buffer_slice, device_offset, 0).await
280    }
281
282    async fn read_at_traced(
283        &self,
284        buffer_slice: MutableBufferSlice<'_>,
285        device_offset: u64,
286        trace_flow_id: u64,
287    ) -> Result<(), zx::Status>;
288
289    /// Writes the data in |buffer_slice| to the device.
290    async fn write_at(
291        &self,
292        buffer_slice: BufferSlice<'_>,
293        device_offset: u64,
294    ) -> Result<(), zx::Status> {
295        self.write_at_with_opts_traced(
296            buffer_slice,
297            device_offset,
298            WriteOptions::empty(),
299            NO_TRACE_ID,
300        )
301        .await
302    }
303
304    async fn write_at_with_opts(
305        &self,
306        buffer_slice: BufferSlice<'_>,
307        device_offset: u64,
308        opts: WriteOptions,
309    ) -> Result<(), zx::Status> {
310        self.write_at_with_opts_traced(buffer_slice, device_offset, opts, NO_TRACE_ID).await
311    }
312
313    async fn write_at_with_opts_traced(
314        &self,
315        buffer_slice: BufferSlice<'_>,
316        device_offset: u64,
317        opts: WriteOptions,
318        trace_flow_id: u64,
319    ) -> Result<(), zx::Status>;
320
321    /// Trims the given range on the block device.
322    async fn trim(&self, device_range: Range<u64>) -> Result<(), zx::Status> {
323        self.trim_traced(device_range, NO_TRACE_ID).await
324    }
325
326    async fn trim_traced(
327        &self,
328        device_range: Range<u64>,
329        trace_flow_id: u64,
330    ) -> Result<(), zx::Status>;
331
332    async fn flush(&self) -> Result<(), zx::Status> {
333        self.flush_traced(NO_TRACE_ID).await
334    }
335
336    /// Sends a flush request to the underlying block device.
337    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status>;
338
339    /// Closes the fifo.
340    async fn close(&self) -> Result<(), zx::Status>;
341
342    /// Returns the block size of the device.
343    fn block_size(&self) -> u32;
344
345    /// Returns the size, in blocks, of the device.
346    fn block_count(&self) -> u64;
347
348    /// Returns the block flags reported by the device.
349    fn block_flags(&self) -> BlockFlags;
350
351    /// Returns true if the remote fifo is still connected.
352    fn is_connected(&self) -> bool;
353}
354
355struct Common {
356    block_size: u32,
357    block_count: u64,
358    block_flags: BlockFlags,
359    fifo_state: FifoStateRef,
360    temp_vmo: futures::lock::Mutex<zx::Vmo>,
361    temp_vmo_id: VmoId,
362}
363
364impl Common {
365    fn new(
366        fifo: fasync::Fifo<BlockFifoResponse, BlockFifoRequest>,
367        info: &block::BlockInfo,
368        temp_vmo: zx::Vmo,
369        temp_vmo_id: VmoId,
370    ) -> Self {
371        let fifo_state = Arc::new(Mutex::new(FifoState { fifo: Some(fifo), ..Default::default() }));
372        fasync::Task::spawn(FifoPoller { fifo_state: fifo_state.clone() }).detach();
373        Self {
374            block_size: info.block_size,
375            block_count: info.block_count,
376            block_flags: info.flags,
377            fifo_state,
378            temp_vmo: futures::lock::Mutex::new(temp_vmo),
379            temp_vmo_id,
380        }
381    }
382
383    fn to_blocks(&self, bytes: u64) -> Result<u64, zx::Status> {
384        if bytes % self.block_size as u64 != 0 {
385            Err(zx::Status::INVALID_ARGS)
386        } else {
387            Ok(bytes / self.block_size as u64)
388        }
389    }
390
391    // Sends the request and waits for the response.
392    async fn send(&self, mut request: BlockFifoRequest) -> Result<(), zx::Status> {
393        async move {
394            let (request_id, trace_flow_id) = {
395                let mut state = self.fifo_state.lock().unwrap();
396                if state.fifo.is_none() {
397                    // Fifo has been closed.
398                    return Err(zx::Status::CANCELED);
399                }
400                trace::duration!(
401                    c"storage",
402                    c"block_client::send::start",
403                    "op" => opcode_str(request.command.opcode)
404                );
405                let request_id = state.next_request_id;
406                state.next_request_id = state.next_request_id.overflowing_add(1).0;
407                assert!(
408                    state.map.insert(request_id, RequestState::default()).is_none(),
409                    "request id in use!"
410                );
411                request.reqid = request_id;
412                if request.trace_flow_id == NO_TRACE_ID {
413                    request.trace_flow_id = generate_trace_flow_id(request_id);
414                }
415                let trace_flow_id = request.trace_flow_id;
416                trace::flow_begin!(c"storage", c"block_client::send", trace_flow_id);
417                state.queue.push_back(request);
418                if let Some(waker) = state.poller_waker.clone() {
419                    state.poll_send_requests(&mut Context::from_waker(&waker));
420                }
421                (request_id, trace_flow_id)
422            };
423            ResponseFuture::new(self.fifo_state.clone(), request_id).await?;
424            trace::duration!(c"storage", c"block_client::send::end");
425            trace::flow_end!(c"storage", c"block_client::send", trace_flow_id);
426            Ok(())
427        }
428        .await
429    }
430
431    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
432        self.send(BlockFifoRequest {
433            command: BlockFifoCommand {
434                opcode: BlockOpcode::CloseVmo.into_primitive(),
435                flags: 0,
436                ..Default::default()
437            },
438            vmoid: vmo_id.into_id(),
439            ..Default::default()
440        })
441        .await
442    }
443
444    async fn read_at(
445        &self,
446        buffer_slice: MutableBufferSlice<'_>,
447        device_offset: u64,
448        trace_flow_id: u64,
449    ) -> Result<(), zx::Status> {
450        match buffer_slice {
451            MutableBufferSlice::VmoId { vmo_id, offset, length } => {
452                self.send(BlockFifoRequest {
453                    command: BlockFifoCommand {
454                        opcode: BlockOpcode::Read.into_primitive(),
455                        flags: 0,
456                        ..Default::default()
457                    },
458                    vmoid: vmo_id.id(),
459                    length: self
460                        .to_blocks(length)?
461                        .try_into()
462                        .map_err(|_| zx::Status::INVALID_ARGS)?,
463                    vmo_offset: self.to_blocks(offset)?,
464                    dev_offset: self.to_blocks(device_offset)?,
465                    trace_flow_id,
466                    ..Default::default()
467                })
468                .await?
469            }
470            MutableBufferSlice::Memory(mut slice) => {
471                let temp_vmo = self.temp_vmo.lock().await;
472                let mut device_block = self.to_blocks(device_offset)?;
473                loop {
474                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
475                    let block_count = self.to_blocks(to_do as u64)? as u32;
476                    self.send(BlockFifoRequest {
477                        command: BlockFifoCommand {
478                            opcode: BlockOpcode::Read.into_primitive(),
479                            flags: 0,
480                            ..Default::default()
481                        },
482                        vmoid: self.temp_vmo_id.id(),
483                        length: block_count,
484                        vmo_offset: 0,
485                        dev_offset: device_block,
486                        trace_flow_id,
487                        ..Default::default()
488                    })
489                    .await?;
490                    temp_vmo.read(&mut slice[..to_do], 0)?;
491                    if to_do == slice.len() {
492                        break;
493                    }
494                    device_block += block_count as u64;
495                    slice = &mut slice[to_do..];
496                }
497            }
498        }
499        Ok(())
500    }
501
502    async fn write_at(
503        &self,
504        buffer_slice: BufferSlice<'_>,
505        device_offset: u64,
506        opts: WriteOptions,
507        trace_flow_id: u64,
508    ) -> Result<(), zx::Status> {
509        let flags = if opts.contains(WriteOptions::FORCE_ACCESS) {
510            BlockIoFlag::FORCE_ACCESS.bits()
511        } else {
512            0
513        };
514        match buffer_slice {
515            BufferSlice::VmoId { vmo_id, offset, length } => {
516                self.send(BlockFifoRequest {
517                    command: BlockFifoCommand {
518                        opcode: BlockOpcode::Write.into_primitive(),
519                        flags,
520                        ..Default::default()
521                    },
522                    vmoid: vmo_id.id(),
523                    length: self
524                        .to_blocks(length)?
525                        .try_into()
526                        .map_err(|_| zx::Status::INVALID_ARGS)?,
527                    vmo_offset: self.to_blocks(offset)?,
528                    dev_offset: self.to_blocks(device_offset)?,
529                    trace_flow_id,
530                    ..Default::default()
531                })
532                .await?;
533            }
534            BufferSlice::Memory(mut slice) => {
535                let temp_vmo = self.temp_vmo.lock().await;
536                let mut device_block = self.to_blocks(device_offset)?;
537                loop {
538                    let to_do = std::cmp::min(TEMP_VMO_SIZE, slice.len());
539                    let block_count = self.to_blocks(to_do as u64)? as u32;
540                    temp_vmo.write(&slice[..to_do], 0)?;
541                    self.send(BlockFifoRequest {
542                        command: BlockFifoCommand {
543                            opcode: BlockOpcode::Write.into_primitive(),
544                            flags,
545                            ..Default::default()
546                        },
547                        vmoid: self.temp_vmo_id.id(),
548                        length: block_count,
549                        vmo_offset: 0,
550                        dev_offset: device_block,
551                        trace_flow_id,
552                        ..Default::default()
553                    })
554                    .await?;
555                    if to_do == slice.len() {
556                        break;
557                    }
558                    device_block += block_count as u64;
559                    slice = &slice[to_do..];
560                }
561            }
562        }
563        Ok(())
564    }
565
566    async fn trim(&self, device_range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
567        let length = self.to_blocks(device_range.end - device_range.start)? as u32;
568        let dev_offset = self.to_blocks(device_range.start)?;
569        self.send(BlockFifoRequest {
570            command: BlockFifoCommand {
571                opcode: BlockOpcode::Trim.into_primitive(),
572                flags: 0,
573                ..Default::default()
574            },
575            vmoid: block_driver::BLOCK_VMOID_INVALID,
576            length,
577            dev_offset,
578            trace_flow_id,
579            ..Default::default()
580        })
581        .await
582    }
583
584    async fn flush(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
585        self.send(BlockFifoRequest {
586            command: BlockFifoCommand {
587                opcode: BlockOpcode::Flush.into_primitive(),
588                flags: 0,
589                ..Default::default()
590            },
591            vmoid: block_driver::BLOCK_VMOID_INVALID,
592            trace_flow_id,
593            ..Default::default()
594        })
595        .await
596    }
597
598    fn block_size(&self) -> u32 {
599        self.block_size
600    }
601
602    fn block_count(&self) -> u64 {
603        self.block_count
604    }
605
606    fn block_flags(&self) -> BlockFlags {
607        self.block_flags
608    }
609
610    fn is_connected(&self) -> bool {
611        self.fifo_state.lock().unwrap().fifo.is_some()
612    }
613}
614
615impl Drop for Common {
616    fn drop(&mut self) {
617        // It's OK to leak the VMO id because the server will dump all VMOs when the fifo is torn
618        // down.
619        let _ = self.temp_vmo_id.take().into_id();
620        self.fifo_state.lock().unwrap().terminate();
621    }
622}
623
624/// RemoteBlockClient is a BlockClient that communicates with a real block device over FIDL.
625pub struct RemoteBlockClient {
626    session: block::SessionProxy,
627    common: Common,
628}
629
630pub trait AsBlockProxy {
631    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>>;
632
633    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error>;
634}
635
636impl<T: AsBlockProxy> AsBlockProxy for &T {
637    fn get_info(&self) -> impl Future<Output = Result<block::BlockGetInfoResult, fidl::Error>> {
638        AsBlockProxy::get_info(*self)
639    }
640    fn open_session(&self, session: ServerEnd<block::SessionMarker>) -> Result<(), fidl::Error> {
641        AsBlockProxy::open_session(*self, session)
642    }
643}
644
645macro_rules! impl_as_block_proxy {
646    ($name:ident) => {
647        impl AsBlockProxy for $name {
648            async fn get_info(&self) -> Result<block::BlockGetInfoResult, fidl::Error> {
649                $name::get_info(self).await
650            }
651
652            fn open_session(
653                &self,
654                session: ServerEnd<block::SessionMarker>,
655            ) -> Result<(), fidl::Error> {
656                $name::open_session(self, session)
657            }
658        }
659    };
660}
661
662impl_as_block_proxy!(BlockProxy);
663impl_as_block_proxy!(PartitionProxy);
664impl_as_block_proxy!(VolumeProxy);
665
666impl RemoteBlockClient {
667    /// Returns a connection to a remote block device via the given channel.
668    pub async fn new(remote: impl AsBlockProxy) -> Result<Self, zx::Status> {
669        let info =
670            remote.get_info().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
671        let (session, server) = fidl::endpoints::create_proxy();
672        let () = remote.open_session(server).map_err(fidl_to_status)?;
673        Self::from_session(info, session).await
674    }
675
676    pub async fn from_session(
677        info: block::BlockInfo,
678        session: block::SessionProxy,
679    ) -> Result<Self, zx::Status> {
680        let fifo =
681            session.get_fifo().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
682        let fifo = fasync::Fifo::from_fifo(fifo);
683        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
684        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
685        let vmo_id =
686            session.attach_vmo(dup).await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
687        let vmo_id = VmoId::new(vmo_id.id);
688        Ok(RemoteBlockClient { session, common: Common::new(fifo, &info, temp_vmo, vmo_id) })
689    }
690}
691
692#[async_trait]
693impl BlockClient for RemoteBlockClient {
694    async fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
695        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
696        let vmo_id = self
697            .session
698            .attach_vmo(dup)
699            .await
700            .map_err(fidl_to_status)?
701            .map_err(zx::Status::from_raw)?;
702        Ok(VmoId::new(vmo_id.id))
703    }
704
705    async fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
706        self.common.detach_vmo(vmo_id).await
707    }
708
709    async fn read_at_traced(
710        &self,
711        buffer_slice: MutableBufferSlice<'_>,
712        device_offset: u64,
713        trace_flow_id: u64,
714    ) -> Result<(), zx::Status> {
715        self.common.read_at(buffer_slice, device_offset, trace_flow_id).await
716    }
717
718    async fn write_at_with_opts_traced(
719        &self,
720        buffer_slice: BufferSlice<'_>,
721        device_offset: u64,
722        opts: WriteOptions,
723        trace_flow_id: u64,
724    ) -> Result<(), zx::Status> {
725        self.common.write_at(buffer_slice, device_offset, opts, trace_flow_id).await
726    }
727
728    async fn trim_traced(&self, range: Range<u64>, trace_flow_id: u64) -> Result<(), zx::Status> {
729        self.common.trim(range, trace_flow_id).await
730    }
731
732    async fn flush_traced(&self, trace_flow_id: u64) -> Result<(), zx::Status> {
733        self.common.flush(trace_flow_id).await
734    }
735
736    async fn close(&self) -> Result<(), zx::Status> {
737        let () =
738            self.session.close().await.map_err(fidl_to_status)?.map_err(zx::Status::from_raw)?;
739        Ok(())
740    }
741
742    fn block_size(&self) -> u32 {
743        self.common.block_size()
744    }
745
746    fn block_count(&self) -> u64 {
747        self.common.block_count()
748    }
749
750    fn block_flags(&self) -> BlockFlags {
751        self.common.block_flags()
752    }
753
754    fn is_connected(&self) -> bool {
755        self.common.is_connected()
756    }
757}
758
759pub struct RemoteBlockClientSync {
760    session: block::SessionSynchronousProxy,
761    common: Common,
762}
763
764impl RemoteBlockClientSync {
765    /// Returns a connection to a remote block device via the given channel, but spawns a separate
766    /// thread for polling the fifo which makes it work in cases where no executor is configured for
767    /// the calling thread.
768    pub fn new(
769        client_end: fidl::endpoints::ClientEnd<block::BlockMarker>,
770    ) -> Result<Self, zx::Status> {
771        let remote = block::BlockSynchronousProxy::new(client_end.into_channel());
772        let info = remote
773            .get_info(zx::MonotonicInstant::INFINITE)
774            .map_err(fidl_to_status)?
775            .map_err(zx::Status::from_raw)?;
776        let (client, server) = fidl::endpoints::create_endpoints();
777        let () = remote.open_session(server).map_err(fidl_to_status)?;
778        let session = block::SessionSynchronousProxy::new(client.into_channel());
779        let fifo = session
780            .get_fifo(zx::MonotonicInstant::INFINITE)
781            .map_err(fidl_to_status)?
782            .map_err(zx::Status::from_raw)?;
783        let temp_vmo = zx::Vmo::create(TEMP_VMO_SIZE as u64)?;
784        let dup = temp_vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
785        let vmo_id = session
786            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
787            .map_err(fidl_to_status)?
788            .map_err(zx::Status::from_raw)?;
789        let vmo_id = VmoId::new(vmo_id.id);
790
791        // The fifo needs to be instantiated from the thread that has the executor as that's where
792        // the fifo registers for notifications to be delivered.
793        let (sender, receiver) = oneshot::channel::<Result<Self, zx::Status>>();
794        std::thread::spawn(move || {
795            let mut executor = fasync::LocalExecutor::new();
796            let fifo = fasync::Fifo::from_fifo(fifo);
797            let common = Common::new(fifo, &info, temp_vmo, vmo_id);
798            let fifo_state = common.fifo_state.clone();
799            let _ = sender.send(Ok(RemoteBlockClientSync { session, common }));
800            executor.run_singlethreaded(FifoPoller { fifo_state });
801        });
802        block_on(receiver).map_err(|_| zx::Status::CANCELED)?
803    }
804
805    pub fn attach_vmo(&self, vmo: &zx::Vmo) -> Result<VmoId, zx::Status> {
806        let dup = vmo.duplicate_handle(zx::Rights::SAME_RIGHTS)?;
807        let vmo_id = self
808            .session
809            .attach_vmo(dup, zx::MonotonicInstant::INFINITE)
810            .map_err(fidl_to_status)?
811            .map_err(zx::Status::from_raw)?;
812        Ok(VmoId::new(vmo_id.id))
813    }
814
815    pub fn detach_vmo(&self, vmo_id: VmoId) -> Result<(), zx::Status> {
816        block_on(self.common.detach_vmo(vmo_id))
817    }
818
819    pub fn read_at(
820        &self,
821        buffer_slice: MutableBufferSlice<'_>,
822        device_offset: u64,
823    ) -> Result<(), zx::Status> {
824        block_on(self.common.read_at(buffer_slice, device_offset, NO_TRACE_ID))
825    }
826
827    pub fn write_at(
828        &self,
829        buffer_slice: BufferSlice<'_>,
830        device_offset: u64,
831    ) -> Result<(), zx::Status> {
832        block_on(self.common.write_at(
833            buffer_slice,
834            device_offset,
835            WriteOptions::empty(),
836            NO_TRACE_ID,
837        ))
838    }
839
840    pub fn flush(&self) -> Result<(), zx::Status> {
841        block_on(self.common.flush(NO_TRACE_ID))
842    }
843
844    pub fn close(&self) -> Result<(), zx::Status> {
845        let () = self
846            .session
847            .close(zx::MonotonicInstant::INFINITE)
848            .map_err(fidl_to_status)?
849            .map_err(zx::Status::from_raw)?;
850        Ok(())
851    }
852
853    pub fn block_size(&self) -> u32 {
854        self.common.block_size()
855    }
856
857    pub fn block_count(&self) -> u64 {
858        self.common.block_count()
859    }
860
861    pub fn is_connected(&self) -> bool {
862        self.common.is_connected()
863    }
864}
865
866impl Drop for RemoteBlockClientSync {
867    fn drop(&mut self) {
868        // Ignore errors here as there is not much we can do about it.
869        let _ = self.close();
870    }
871}
872
873// FifoPoller is a future responsible for sending and receiving from the fifo.
874struct FifoPoller {
875    fifo_state: FifoStateRef,
876}
877
878impl Future for FifoPoller {
879    type Output = ();
880
881    fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
882        let mut state_lock = self.fifo_state.lock().unwrap();
883        let state = state_lock.deref_mut(); // So that we can split the borrow.
884
885        // Send requests.
886        if state.poll_send_requests(context) {
887            return Poll::Ready(());
888        }
889
890        // Receive responses.
891        let fifo = state.fifo.as_ref().unwrap(); // Safe because poll_send_requests checks.
892        while let Poll::Ready(result) = fifo.read_one(context) {
893            match result {
894                Ok(response) => {
895                    let request_id = response.reqid;
896                    // If the request isn't in the map, assume that it's a cancelled read.
897                    if let Some(request_state) = state.map.get_mut(&request_id) {
898                        request_state.result.replace(zx::Status::from_raw(response.status));
899                        if let Some(waker) = request_state.waker.take() {
900                            waker.wake();
901                        }
902                    }
903                }
904                Err(_) => {
905                    state.terminate();
906                    return Poll::Ready(());
907                }
908            }
909        }
910
911        state.poller_waker = Some(context.waker().clone());
912        Poll::Pending
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use super::{
919        BlockClient, BlockFifoRequest, BlockFifoResponse, BufferSlice, MutableBufferSlice,
920        RemoteBlockClient, RemoteBlockClientSync, WriteOptions,
921    };
922    use block_server::{BlockServer, DeviceInfo, PartitionInfo};
923    use fidl::endpoints::RequestStream as _;
924    use fidl_fuchsia_hardware_block as block;
925    use fuchsia_async::{self as fasync, FifoReadable as _, FifoWritable as _};
926    use futures::future::{AbortHandle, Abortable, TryFutureExt as _};
927    use futures::join;
928    use futures::stream::futures_unordered::FuturesUnordered;
929    use futures::stream::StreamExt as _;
930    use ramdevice_client::RamdiskClient;
931    use std::borrow::Cow;
932    use std::num::NonZero;
933    use std::sync::atomic::{AtomicBool, Ordering};
934    use std::sync::Arc;
935
936    const RAMDISK_BLOCK_SIZE: u64 = 1024;
937    const RAMDISK_BLOCK_COUNT: u64 = 1024;
938
939    pub async fn make_ramdisk() -> (RamdiskClient, block::BlockProxy, RemoteBlockClient) {
940        let ramdisk = RamdiskClient::create(RAMDISK_BLOCK_SIZE, RAMDISK_BLOCK_COUNT)
941            .await
942            .expect("RamdiskClient::create failed");
943        let client_end = ramdisk.open().expect("ramdisk.open failed");
944        let proxy = client_end.into_proxy();
945        let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
946        assert_eq!(block_client.block_size(), 1024);
947        let client_end = ramdisk.open().expect("ramdisk.open failed");
948        let proxy = client_end.into_proxy();
949        (ramdisk, proxy, block_client)
950    }
951
952    #[fuchsia::test]
953    async fn test_against_ram_disk() {
954        let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
955
956        let stats_before = block_proxy
957            .get_stats(false)
958            .await
959            .expect("get_stats failed")
960            .map_err(zx::Status::from_raw)
961            .expect("get_stats error");
962
963        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
964        vmo.write(b"hello", 5).expect("vmo.write failed");
965        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
966        block_client
967            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
968            .await
969            .expect("write_at failed");
970        block_client
971            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
972            .await
973            .expect("read_at failed");
974        let mut buf: [u8; 5] = Default::default();
975        vmo.read(&mut buf, 1029).expect("vmo.read failed");
976        assert_eq!(&buf, b"hello");
977        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
978
979        // check that the stats are what we expect them to be
980        let stats_after = block_proxy
981            .get_stats(false)
982            .await
983            .expect("get_stats failed")
984            .map_err(zx::Status::from_raw)
985            .expect("get_stats error");
986        // write stats
987        assert_eq!(
988            stats_before.write.success.total_calls + 1,
989            stats_after.write.success.total_calls
990        );
991        assert_eq!(
992            stats_before.write.success.bytes_transferred + 1024,
993            stats_after.write.success.bytes_transferred
994        );
995        assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
996        // read stats
997        assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
998        assert_eq!(
999            stats_before.read.success.bytes_transferred + 2048,
1000            stats_after.read.success.bytes_transferred
1001        );
1002        assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1003    }
1004
1005    #[fuchsia::test]
1006    async fn test_against_ram_disk_with_flush() {
1007        let (_ramdisk, block_proxy, block_client) = make_ramdisk().await;
1008
1009        let stats_before = block_proxy
1010            .get_stats(false)
1011            .await
1012            .expect("get_stats failed")
1013            .map_err(zx::Status::from_raw)
1014            .expect("get_stats error");
1015
1016        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1017        vmo.write(b"hello", 5).expect("vmo.write failed");
1018        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1019        block_client
1020            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1021            .await
1022            .expect("write_at failed");
1023        block_client.flush().await.expect("flush failed");
1024        block_client
1025            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 1024, 2048), 0)
1026            .await
1027            .expect("read_at failed");
1028        let mut buf: [u8; 5] = Default::default();
1029        vmo.read(&mut buf, 1029).expect("vmo.read failed");
1030        assert_eq!(&buf, b"hello");
1031        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1032
1033        // check that the stats are what we expect them to be
1034        let stats_after = block_proxy
1035            .get_stats(false)
1036            .await
1037            .expect("get_stats failed")
1038            .map_err(zx::Status::from_raw)
1039            .expect("get_stats error");
1040        // write stats
1041        assert_eq!(
1042            stats_before.write.success.total_calls + 1,
1043            stats_after.write.success.total_calls
1044        );
1045        assert_eq!(
1046            stats_before.write.success.bytes_transferred + 1024,
1047            stats_after.write.success.bytes_transferred
1048        );
1049        assert_eq!(stats_before.write.failure.total_calls, stats_after.write.failure.total_calls);
1050        // flush stats
1051        assert_eq!(
1052            stats_before.flush.success.total_calls + 1,
1053            stats_after.flush.success.total_calls
1054        );
1055        assert_eq!(stats_before.flush.failure.total_calls, stats_after.flush.failure.total_calls);
1056        // read stats
1057        assert_eq!(stats_before.read.success.total_calls + 1, stats_after.read.success.total_calls);
1058        assert_eq!(
1059            stats_before.read.success.bytes_transferred + 2048,
1060            stats_after.read.success.bytes_transferred
1061        );
1062        assert_eq!(stats_before.read.failure.total_calls, stats_after.read.failure.total_calls);
1063    }
1064
1065    #[fuchsia::test]
1066    async fn test_alignment() {
1067        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1068        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1069        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1070        block_client
1071            .write_at(BufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 1)
1072            .await
1073            .expect_err("expected failure due to bad alignment");
1074        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1075    }
1076
1077    #[fuchsia::test]
1078    async fn test_parallel_io() {
1079        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1080        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1081        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1082        let mut reads = Vec::new();
1083        for _ in 0..1024 {
1084            reads.push(
1085                block_client
1086                    .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1087                    .inspect_err(|e| panic!("read should have succeeded: {}", e)),
1088            );
1089        }
1090        futures::future::join_all(reads).await;
1091        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1092    }
1093
1094    #[fuchsia::test]
1095    async fn test_closed_device() {
1096        let (ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1097        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1098        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1099        let mut reads = Vec::new();
1100        for _ in 0..1024 {
1101            reads.push(
1102                block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1103            );
1104        }
1105        assert!(block_client.is_connected());
1106        let _ = futures::join!(futures::future::join_all(reads), async {
1107            ramdisk.destroy().await.expect("ramdisk.destroy failed")
1108        });
1109        // Destroying the ramdisk is asynchronous. Keep issuing reads until they start failing.
1110        while block_client
1111            .read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0)
1112            .await
1113            .is_ok()
1114        {}
1115
1116        // Sometimes the FIFO will start rejecting requests before FIFO is actually closed, so we
1117        // get false-positives from is_connected.
1118        while block_client.is_connected() {
1119            // Sleep for a bit to minimise lock contention.
1120            fasync::Timer::new(fasync::MonotonicInstant::after(
1121                zx::MonotonicDuration::from_millis(500),
1122            ))
1123            .await;
1124        }
1125
1126        // But once is_connected goes negative, it should stay negative.
1127        assert_eq!(block_client.is_connected(), false);
1128        let _ = block_client.detach_vmo(vmo_id).await;
1129    }
1130
1131    #[fuchsia::test]
1132    async fn test_cancelled_reads() {
1133        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1134        let vmo = zx::Vmo::create(131072).expect("Vmo::create failed");
1135        let vmo_id = block_client.attach_vmo(&vmo).await.expect("attach_vmo failed");
1136        {
1137            let mut reads = FuturesUnordered::new();
1138            for _ in 0..1024 {
1139                reads.push(
1140                    block_client.read_at(MutableBufferSlice::new_with_vmo_id(&vmo_id, 0, 1024), 0),
1141                );
1142            }
1143            // Read the first 500 results and then dump the rest.
1144            for _ in 0..500 {
1145                reads.next().await;
1146            }
1147        }
1148        block_client.detach_vmo(vmo_id).await.expect("detach_vmo failed");
1149    }
1150
1151    #[fuchsia::test]
1152    async fn test_parallel_large_read_and_write_with_memory_succeds() {
1153        let (_ramdisk, _block_proxy, block_client) = make_ramdisk().await;
1154        let block_client_ref = &block_client;
1155        let test_one = |offset, len, fill| async move {
1156            let buf = vec![fill; len];
1157            block_client_ref.write_at(buf[..].into(), offset).await.expect("write_at failed");
1158            // Read back an extra block either side.
1159            let mut read_buf = vec![0u8; len + 2 * RAMDISK_BLOCK_SIZE as usize];
1160            block_client_ref
1161                .read_at(read_buf.as_mut_slice().into(), offset - RAMDISK_BLOCK_SIZE)
1162                .await
1163                .expect("read_at failed");
1164            assert_eq!(
1165                &read_buf[0..RAMDISK_BLOCK_SIZE as usize],
1166                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1167            );
1168            assert_eq!(
1169                &read_buf[RAMDISK_BLOCK_SIZE as usize..RAMDISK_BLOCK_SIZE as usize + len],
1170                &buf[..]
1171            );
1172            assert_eq!(
1173                &read_buf[RAMDISK_BLOCK_SIZE as usize + len..],
1174                &[0; RAMDISK_BLOCK_SIZE as usize][..]
1175            );
1176        };
1177        const WRITE_LEN: usize = super::TEMP_VMO_SIZE * 3 + RAMDISK_BLOCK_SIZE as usize;
1178        join!(
1179            test_one(RAMDISK_BLOCK_SIZE, WRITE_LEN, 0xa3u8),
1180            test_one(2 * RAMDISK_BLOCK_SIZE + WRITE_LEN as u64, WRITE_LEN, 0x7fu8)
1181        );
1182    }
1183
1184    // Implements dummy server which can be used by test cases to verify whether
1185    // channel messages and fifo operations are being received - by using set_channel_handler or
1186    // set_fifo_hander respectively
1187    struct FakeBlockServer<'a> {
1188        server_channel: Option<fidl::endpoints::ServerEnd<block::BlockMarker>>,
1189        channel_handler: Box<dyn Fn(&block::SessionRequest) -> bool + 'a>,
1190        fifo_handler: Box<dyn Fn(BlockFifoRequest) -> BlockFifoResponse + 'a>,
1191    }
1192
1193    impl<'a> FakeBlockServer<'a> {
1194        // Creates a new FakeBlockServer given a channel to listen on.
1195        //
1196        // 'channel_handler' and 'fifo_handler' closures allow for customizing the way how the server
1197        // handles requests received from channel or the fifo respectfully.
1198        //
1199        // 'channel_handler' receives a message before it is handled by the default implementation
1200        // and can return 'true' to indicate all processing is done and no further processing of
1201        // that message is required
1202        //
1203        // 'fifo_handler' takes as input a BlockFifoRequest and produces a response which the
1204        // FakeBlockServer will send over the fifo.
1205        fn new(
1206            server_channel: fidl::endpoints::ServerEnd<block::BlockMarker>,
1207            channel_handler: impl Fn(&block::SessionRequest) -> bool + 'a,
1208            fifo_handler: impl Fn(BlockFifoRequest) -> BlockFifoResponse + 'a,
1209        ) -> FakeBlockServer<'a> {
1210            FakeBlockServer {
1211                server_channel: Some(server_channel),
1212                channel_handler: Box::new(channel_handler),
1213                fifo_handler: Box::new(fifo_handler),
1214            }
1215        }
1216
1217        // Runs the server
1218        async fn run(&mut self) {
1219            let server = self.server_channel.take().unwrap();
1220
1221            // Set up a mock server.
1222            let (server_fifo, client_fifo) =
1223                zx::Fifo::<BlockFifoRequest, BlockFifoResponse>::create(16)
1224                    .expect("Fifo::create failed");
1225            let maybe_server_fifo = std::sync::Mutex::new(Some(client_fifo));
1226
1227            let (fifo_future_abort, fifo_future_abort_registration) = AbortHandle::new_pair();
1228            let fifo_future = Abortable::new(
1229                async {
1230                    let fifo = fasync::Fifo::from_fifo(server_fifo);
1231                    loop {
1232                        let request = match fifo.read_entry().await {
1233                            Ok(r) => r,
1234                            Err(zx::Status::PEER_CLOSED) => break,
1235                            Err(e) => panic!("read_entry failed {:?}", e),
1236                        };
1237
1238                        let response = self.fifo_handler.as_ref()(request);
1239                        fifo.write_entries(std::slice::from_ref(&response))
1240                            .await
1241                            .expect("write_entries failed");
1242                    }
1243                },
1244                fifo_future_abort_registration,
1245            );
1246
1247            let channel_future = async {
1248                server
1249                    .into_stream()
1250                    .for_each_concurrent(None, |request| async {
1251                        let request = request.expect("unexpected fidl error");
1252
1253                        match request {
1254                            block::BlockRequest::GetInfo { responder } => {
1255                                responder
1256                                    .send(Ok(&block::BlockInfo {
1257                                        block_count: 1024,
1258                                        block_size: 512,
1259                                        max_transfer_size: 1024 * 1024,
1260                                        flags: block::Flag::empty(),
1261                                    }))
1262                                    .expect("send failed");
1263                            }
1264                            block::BlockRequest::OpenSession { session, control_handle: _ } => {
1265                                let stream = session.into_stream();
1266                                stream
1267                                    .for_each(|request| async {
1268                                        let request = request.expect("unexpected fidl error");
1269                                        // Give a chance for the test to register and potentially
1270                                        // handle the event
1271                                        if self.channel_handler.as_ref()(&request) {
1272                                            return;
1273                                        }
1274                                        match request {
1275                                            block::SessionRequest::GetFifo { responder } => {
1276                                                match maybe_server_fifo.lock().unwrap().take() {
1277                                                    Some(fifo) => {
1278                                                        responder.send(Ok(fifo.downcast()))
1279                                                    }
1280                                                    None => responder.send(Err(
1281                                                        zx::Status::NO_RESOURCES.into_raw(),
1282                                                    )),
1283                                                }
1284                                                .expect("send failed")
1285                                            }
1286                                            block::SessionRequest::AttachVmo {
1287                                                vmo: _,
1288                                                responder,
1289                                            } => responder
1290                                                .send(Ok(&block::VmoId { id: 1 }))
1291                                                .expect("send failed"),
1292                                            block::SessionRequest::Close { responder } => {
1293                                                fifo_future_abort.abort();
1294                                                responder.send(Ok(())).expect("send failed")
1295                                            }
1296                                        }
1297                                    })
1298                                    .await
1299                            }
1300                            _ => panic!("Unexpected message"),
1301                        }
1302                    })
1303                    .await;
1304            };
1305
1306            let _result = join!(fifo_future, channel_future);
1307            //_result can be Err(Aborted) since FifoClose calls .abort but that's expected
1308        }
1309    }
1310
1311    #[fuchsia::test]
1312    async fn test_block_close_is_called() {
1313        let close_called = std::sync::Mutex::new(false);
1314        let (client_end, server) = fidl::endpoints::create_endpoints::<block::BlockMarker>();
1315
1316        std::thread::spawn(move || {
1317            let _block_client =
1318                RemoteBlockClientSync::new(client_end).expect("RemoteBlockClientSync::new failed");
1319            // The drop here should cause Close to be sent.
1320        });
1321
1322        let channel_handler = |request: &block::SessionRequest| -> bool {
1323            if let block::SessionRequest::Close { .. } = request {
1324                *close_called.lock().unwrap() = true;
1325            }
1326            false
1327        };
1328        FakeBlockServer::new(server, channel_handler, |_| unreachable!()).run().await;
1329
1330        // After the server has finished running, we can check to see that close was called.
1331        assert!(*close_called.lock().unwrap());
1332    }
1333
1334    #[fuchsia::test]
1335    async fn test_block_flush_is_called() {
1336        let (proxy, stream) = fidl::endpoints::create_proxy_and_stream::<block::BlockMarker>();
1337
1338        struct Interface {
1339            flush_called: Arc<AtomicBool>,
1340        }
1341        impl block_server::async_interface::Interface for Interface {
1342            async fn get_info(&self) -> Result<Cow<'_, DeviceInfo>, zx::Status> {
1343                Ok(Cow::Owned(DeviceInfo::Partition(PartitionInfo {
1344                    device_flags: fidl_fuchsia_hardware_block::Flag::empty(),
1345                    block_range: Some(0..1000),
1346                    type_guid: [0; 16],
1347                    instance_guid: [0; 16],
1348                    name: "foo".to_string(),
1349                    flags: 0,
1350                })))
1351            }
1352
1353            async fn read(
1354                &self,
1355                _device_block_offset: u64,
1356                _block_count: u32,
1357                _vmo: &Arc<zx::Vmo>,
1358                _vmo_offset: u64,
1359                _trace_flow_id: Option<NonZero<u64>>,
1360            ) -> Result<(), zx::Status> {
1361                unreachable!();
1362            }
1363
1364            async fn write(
1365                &self,
1366                _device_block_offset: u64,
1367                _block_count: u32,
1368                _vmo: &Arc<zx::Vmo>,
1369                _vmo_offset: u64,
1370                _opts: WriteOptions,
1371                _trace_flow_id: Option<NonZero<u64>>,
1372            ) -> Result<(), zx::Status> {
1373                unreachable!();
1374            }
1375
1376            async fn flush(&self, _trace_flow_id: Option<NonZero<u64>>) -> Result<(), zx::Status> {
1377                self.flush_called.store(true, Ordering::Relaxed);
1378                Ok(())
1379            }
1380
1381            async fn trim(
1382                &self,
1383                _device_block_offset: u64,
1384                _block_count: u32,
1385                _trace_flow_id: Option<NonZero<u64>>,
1386            ) -> Result<(), zx::Status> {
1387                unreachable!();
1388            }
1389        }
1390
1391        let flush_called = Arc::new(AtomicBool::new(false));
1392
1393        futures::join!(
1394            async {
1395                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1396
1397                block_client.flush().await.expect("flush failed");
1398            },
1399            async {
1400                let block_server = BlockServer::new(
1401                    512,
1402                    Arc::new(Interface { flush_called: flush_called.clone() }),
1403                );
1404                block_server.handle_requests(stream.cast_stream()).await.unwrap();
1405            }
1406        );
1407
1408        assert!(flush_called.load(Ordering::Relaxed));
1409    }
1410
1411    #[fuchsia::test]
1412    async fn test_trace_flow_ids_set() {
1413        let (proxy, server) = fidl::endpoints::create_proxy();
1414
1415        futures::join!(
1416            async {
1417                let block_client = RemoteBlockClient::new(proxy).await.expect("new failed");
1418                block_client.flush().await.expect("flush failed");
1419            },
1420            async {
1421                let flow_id: std::sync::Mutex<Option<u64>> = std::sync::Mutex::new(None);
1422                let fifo_handler = |request: BlockFifoRequest| -> BlockFifoResponse {
1423                    if request.trace_flow_id > 0 {
1424                        *flow_id.lock().unwrap() = Some(request.trace_flow_id);
1425                    }
1426                    BlockFifoResponse {
1427                        status: zx::Status::OK.into_raw(),
1428                        reqid: request.reqid,
1429                        ..Default::default()
1430                    }
1431                };
1432                FakeBlockServer::new(server, |_| false, fifo_handler).run().await;
1433                // After the server has finished running, verify the trace flow ID was set to some value.
1434                assert!(flow_id.lock().unwrap().is_some());
1435            }
1436        );
1437    }
1438}